Skip to content

Commit 189e471

Browse files
authored
Handle scheduling failures more gracefully (#4486)
Currently if we have a scheduling failure, we continuously run scheduling rounds until we succeed - Often it won't succeed as the state of the system can't change much due to not running any more reconciliations Now we will run continue to run reconciliation rounds between scheduling rounds, even if the scheduling round fails - We do this by setting `previousSchedulingRoundEnd` even if the scheduling round failed This allows the system to continue working even if we have a bug: - It also allows operators to cancel the jobs causing the bug / the state to evolve so future scheduling rounds may not exhibit the bug Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
1 parent 5c16dad commit 189e471

File tree

1 file changed

+25
-24
lines changed

1 file changed

+25
-24
lines changed

internal/scheduler/scheduler.go

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ type Scheduler struct {
6666
// Used to penalize short-running jobs by pretending they
6767
// ran for some minimum length when calculating costs.
6868
shortJobPenalty *scheduling.ShortJobPenalty
69-
// The time the previous scheduling round ended
70-
previousSchedulingRoundEnd time.Time
7169
// Used for timing decisions (e.g., sleep).
7270
// Injected here so that we can mock it out for testing.
7371
clock clock.WithTicker
@@ -110,26 +108,25 @@ func NewScheduler(
110108
marketDrivenPools []string,
111109
) (*Scheduler, error) {
112110
return &Scheduler{
113-
jobRepository: jobRepository,
114-
executorRepository: executorRepository,
115-
schedulingAlgo: schedulingAlgo,
116-
leaderController: leaderController,
117-
publisher: publisher,
118-
submitChecker: submitChecker,
119-
jobDb: jobDb,
120-
clock: clock.RealClock{},
121-
cyclePeriod: cyclePeriod,
122-
schedulePeriod: schedulePeriod,
123-
previousSchedulingRoundEnd: time.Time{},
124-
executorTimeout: executorTimeout,
125-
bidPriceProvider: bidPriceProvider,
126-
shortJobPenalty: shortJobPenalty,
127-
maxAttemptedRuns: maxAttemptedRuns,
128-
nodeIdLabel: nodeIdLabel,
129-
jobsSerial: -1,
130-
runsSerial: -1,
131-
metrics: metrics,
132-
marketDrivenPools: marketDrivenPools,
111+
jobRepository: jobRepository,
112+
executorRepository: executorRepository,
113+
schedulingAlgo: schedulingAlgo,
114+
leaderController: leaderController,
115+
publisher: publisher,
116+
submitChecker: submitChecker,
117+
jobDb: jobDb,
118+
clock: clock.RealClock{},
119+
cyclePeriod: cyclePeriod,
120+
schedulePeriod: schedulePeriod,
121+
executorTimeout: executorTimeout,
122+
bidPriceProvider: bidPriceProvider,
123+
shortJobPenalty: shortJobPenalty,
124+
maxAttemptedRuns: maxAttemptedRuns,
125+
nodeIdLabel: nodeIdLabel,
126+
jobsSerial: -1,
127+
runsSerial: -1,
128+
metrics: metrics,
129+
marketDrivenPools: marketDrivenPools,
133130
}, nil
134131
}
135132

@@ -152,6 +149,8 @@ func (s *Scheduler) Run(ctx *armadacontext.Context) error {
152149

153150
ticker := s.clock.NewTicker(s.cyclePeriod)
154151
prevLeaderToken := leader.InvalidLeaderToken()
152+
153+
previousSchedulingRoundEnd := time.Time{}
155154
cycleNumber := 0
156155
for {
157156
select {
@@ -188,12 +187,15 @@ func (s *Scheduler) Run(ctx *armadacontext.Context) error {
188187
// and we must invalidate the held leader token to trigger flushing Pulsar at the next cycle.
189188
//
190189
// TODO: Once the Pulsar client supports transactions, we can guarantee consistency even in case of errors.
191-
shouldSchedule := s.clock.Now().Sub(s.previousSchedulingRoundEnd) > s.schedulePeriod
190+
shouldSchedule := s.clock.Now().Sub(previousSchedulingRoundEnd) > s.schedulePeriod
192191
if !shouldSchedule {
193192
ctx.Info("Won't schedule this cycle as still within schedulePeriod")
194193
}
195194

196195
result, err := s.cycle(ctx, fullUpdate, leaderToken, shouldSchedule, cycleNumber)
196+
if shouldSchedule {
197+
previousSchedulingRoundEnd = s.clock.Now()
198+
}
197199
if err != nil {
198200
ctx.Logger().WithStacktrace(err).Error("scheduling cycle failure")
199201
leaderToken = leader.InvalidLeaderToken()
@@ -344,7 +346,6 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke
344346
return overallSchedulerResult, err
345347
}
346348
events = append(events, resultEvents...)
347-
s.previousSchedulingRoundEnd = s.clock.Now()
348349

349350
overallSchedulerResult = *result
350351
}

0 commit comments

Comments
 (0)