Skip to content

Commit 51cc027

Browse files
authored
Move reconcile to run in scheduling_algo (#4594)
Currently there is a race between the reconciler and the nodes used in scheduling_algo The reconcile is used to ensure the details of scheduled jobs still match the nodes they are scheduled on (such as pool of run = node pool) - If the details no longer match then the jobs are removed (preempted/failed) The reason for this reconciliation is for the core scheduling to act properly, it assumes the jobs still match the details of the nodes - A key source of issues is gangs that end up with jobs across multiple pools, as the pool of one of the nodes has changed The issue is the reconciler runs in scheduler.go, but the core scheduling runs in scheduling_algo, and the nodes used by scheduling_algo can get updated between the reconcile running and the core scheduler running To fix this, the reconciler is now run in scheduling_algo just before the core scheduler acts on the validated nodes/jobs. This ensures no race can occur and the scheduler always acts on "valid" state when the reconciler is enabled --------- Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
1 parent 7107ee2 commit 51cc027

File tree

12 files changed

+565
-317
lines changed

12 files changed

+565
-317
lines changed

internal/scheduler/metrics/cycle_metrics.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,15 @@ func (m *cycleMetrics) ReportSchedulerResult(ctx *armadacontext.Context, result
580580
}
581581
}
582582
}
583+
584+
if result.FailedReconciliationJobs != nil {
585+
for _, info := range result.FailedReconciliationJobs.FailedJobs {
586+
m.ReportJobPreemptedWithType(info.Job, context.PreemptedViaNodeReconciler)
587+
}
588+
for _, info := range result.FailedReconciliationJobs.PreemptedJobs {
589+
m.ReportJobPreemptedWithType(info.Job, context.PreemptedViaNodeReconciler)
590+
}
591+
}
583592
m.latestCycleMetrics.Store(currentCycle)
584593

585594
m.publishCycleMetrics(ctx, result)

internal/scheduler/metrics/state_metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ func (m *jobStateMetrics) updateStateDuration(job *jobdb.Job, state string, prio
286286
for _, res := range m.trackedResourceNames {
287287
resQty := requests.GetByNameZeroIfMissing(string(res))
288288
resSeconds := duration * float64(resQty.MilliValue()) / 1000
289+
resSeconds = math.Max(resSeconds, 0)
289290
m.jobStateResourceSecondsByQueue.
290291
WithLabelValues(queue, pool, state, priorState, res.String()).Add(resSeconds)
291292
m.jobStateResourceSecondsByNode.

internal/scheduler/scheduler.go

Lines changed: 36 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@ package scheduler
22

33
import (
44
"fmt"
5-
"strings"
65
"time"
76

87
"github.com/gogo/protobuf/types"
98
"github.com/google/uuid"
109
"github.com/pkg/errors"
11-
"golang.org/x/exp/maps"
1210
"golang.org/x/exp/slices"
1311
v1 "k8s.io/api/core/v1"
1412
"k8s.io/utils/clock"
@@ -91,7 +89,6 @@ type Scheduler struct {
9189
// A list of the pools that are market driven
9290
// Used to know which jobs need update when updating job prices
9391
marketDrivenPools []string
94-
runNodeReconciler JobRunNodeReconciler
9592
}
9693

9794
func NewScheduler(
@@ -112,7 +109,6 @@ func NewScheduler(
112109
metrics *metrics.Metrics,
113110
bidPriceProvider pricing.BidPriceProvider,
114111
marketDrivenPools []string,
115-
runNodeReconciler JobRunNodeReconciler,
116112
) (*Scheduler, error) {
117113
return &Scheduler{
118114
jobRepository: jobRepository,
@@ -135,7 +131,6 @@ func NewScheduler(
135131
runsSerial: -1,
136132
metrics: metrics,
137133
marketDrivenPools: marketDrivenPools,
138-
runNodeReconciler: runNodeReconciler,
139134
}, nil
140135
}
141136

@@ -352,14 +347,6 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke
352347
// Schedule jobs.
353348
if shouldSchedule {
354349
start := time.Now()
355-
356-
preemptionEvents, err := s.removeRunsThatNoLongerReconcile(ctx, txn)
357-
if err != nil {
358-
return overallSchedulerResult, err
359-
}
360-
ctx.Infof("Finished reconciling runs with nodes, generating %d preemption events", len(preemptionEvents))
361-
events = append(events, preemptionEvents...)
362-
363350
resourceUnits, err := s.updateJobPrices(ctx, txn)
364351
if err != nil {
365352
return overallSchedulerResult, err
@@ -634,6 +621,9 @@ func EventsFromSchedulerResult(result *scheduling.SchedulerResult, time time.Tim
634621
if err != nil {
635622
return nil, err
636623
}
624+
625+
eventSequences = AppendEventSequencesFromReconciliationFailureJobs(eventSequences, result.FailedReconciliationJobs, time)
626+
637627
return eventSequences, nil
638628
}
639629

@@ -728,6 +718,39 @@ func createEventsForPreemptedJob(jobId string, runId string, reason string, time
728718
}
729719
}
730720

721+
func AppendEventSequencesFromReconciliationFailureJobs(eventSequences []*armadaevents.EventSequence, reconciliationResult *scheduling.ReconciliationResult, time time.Time) []*armadaevents.EventSequence {
722+
if reconciliationResult == nil {
723+
return eventSequences
724+
}
725+
for _, jobInfo := range reconciliationResult.FailedJobs {
726+
reconciliationError := &armadaevents.Error{
727+
Terminal: true,
728+
Reason: &armadaevents.Error_ReconciliationError{
729+
ReconciliationError: &armadaevents.ReconciliationError{
730+
Message: jobInfo.Reason,
731+
},
732+
},
733+
}
734+
es := &armadaevents.EventSequence{
735+
Queue: jobInfo.Job.Queue(),
736+
JobSetName: jobInfo.Job.Jobset(),
737+
Events: createEventsForFailedJob(jobInfo.Job.Id(), jobInfo.Job.Id(), reconciliationError, time),
738+
}
739+
eventSequences = append(eventSequences, es)
740+
}
741+
742+
for _, jobInfo := range reconciliationResult.PreemptedJobs {
743+
es := &armadaevents.EventSequence{
744+
Queue: jobInfo.Job.Queue(),
745+
JobSetName: jobInfo.Job.Jobset(),
746+
Events: createEventsForPreemptedJob(jobInfo.Job.Id(), jobInfo.Job.Id(), jobInfo.Reason, time),
747+
}
748+
eventSequences = append(eventSequences, es)
749+
}
750+
751+
return eventSequences
752+
}
753+
731754
func AppendEventSequencesFromScheduledJobs(eventSequences []*armadaevents.EventSequence, jctxs []*schedulercontext.JobSchedulingContext) ([]*armadaevents.EventSequence, error) {
732755
for _, jctx := range jctxs {
733756
job := jctx.Job
@@ -1000,98 +1023,6 @@ func (s *Scheduler) generateUpdateMessagesFromJob(ctx *armadacontext.Context, jo
10001023
return nil, nil
10011024
}
10021025

1003-
func (s *Scheduler) removeRunsThatNoLongerReconcile(ctx *armadacontext.Context, txn *jobdb.Txn) ([]*armadaevents.EventSequence, error) {
1004-
invalidJobs, err := s.runNodeReconciler.ReconcileJobRuns(ctx, txn)
1005-
if err != nil {
1006-
return nil, fmt.Errorf("failed to reconcile runs with nodes because - %s", err)
1007-
}
1008-
jobsUpdated := make(map[string]*jobdb.Job, len(invalidJobs))
1009-
gangsPreempted := map[gangKey][]string{}
1010-
events := make([]*armadaevents.EventSequence, 0, len(invalidJobs))
1011-
for _, invalidJobInfo := range invalidJobs {
1012-
job := invalidJobInfo.Job
1013-
if job.InTerminalState() || job.Queued() || job.LatestRun() == nil {
1014-
continue
1015-
}
1016-
1017-
updatedJob, jobEvents := s.processJobReconciliationFailure(job, invalidJobInfo.Reason)
1018-
jobsUpdated[job.Id()] = updatedJob
1019-
events = append(events, jobEvents...)
1020-
1021-
if job.IsInGang() && job.PriorityClass().Preemptible {
1022-
key := gangKey{job.Queue(), job.GetGangInfo().Id()}
1023-
if _, exists := gangsPreempted[key]; !exists {
1024-
gangsPreempted[key] = []string{}
1025-
}
1026-
gangsPreempted[key] = append(gangsPreempted[key], job.Id())
1027-
}
1028-
}
1029-
1030-
for gang, jobIds := range gangsPreempted {
1031-
jobs, err := txn.GetGangJobsByGangId(gang.queue, gang.gangId)
1032-
if err != nil {
1033-
return nil, fmt.Errorf("failed to process failed reconciliatiion gang jobs because - %s", err)
1034-
}
1035-
1036-
for _, job := range jobs {
1037-
_, alreadyProcessed := jobsUpdated[job.Id()]
1038-
if alreadyProcessed || job.InTerminalState() || !job.PriorityClass().Preemptible {
1039-
continue
1040-
}
1041-
1042-
reason := fmt.Sprintf("other jobs in the gang failed reconciliation (%s)", strings.Join(jobIds, ","))
1043-
updatedJob, jobEvents := s.processJobReconciliationFailure(job, reason)
1044-
jobsUpdated[job.Id()] = updatedJob
1045-
events = append(events, jobEvents...)
1046-
}
1047-
}
1048-
1049-
if err := txn.Upsert(maps.Values(jobsUpdated)); err != nil {
1050-
return nil, err
1051-
}
1052-
return events, nil
1053-
}
1054-
1055-
func (s *Scheduler) processJobReconciliationFailure(job *jobdb.Job, reason string) (*jobdb.Job, []*armadaevents.EventSequence) {
1056-
now := s.clock.Now()
1057-
1058-
job = job.WithQueued(false).WithFailed(true)
1059-
run := job.LatestRun()
1060-
job = job.WithUpdatedRun(run.WithFailed(true))
1061-
1062-
if job.PriorityClass().Preemptible {
1063-
run = run.WithPreemptedTime(&now)
1064-
}
1065-
1066-
events := make([]*armadaevents.EventSequence, 0, 3)
1067-
if job.PriorityClass().Preemptible {
1068-
s.metrics.ReportJobPreemptedWithType(job, schedulercontext.PreemptedViaNodeReconciler)
1069-
es := &armadaevents.EventSequence{
1070-
Queue: job.Queue(),
1071-
JobSetName: job.Jobset(),
1072-
Events: createEventsForPreemptedJob(job.Id(), run.Id(), reason, now),
1073-
}
1074-
events = append(events, es)
1075-
} else {
1076-
reconciliationError := &armadaevents.Error{
1077-
Terminal: true,
1078-
Reason: &armadaevents.Error_ReconciliationError{
1079-
ReconciliationError: &armadaevents.ReconciliationError{
1080-
Message: reason,
1081-
},
1082-
},
1083-
}
1084-
es := &armadaevents.EventSequence{
1085-
Queue: job.Queue(),
1086-
JobSetName: job.Jobset(),
1087-
Events: createEventsForFailedJob(job.Id(), run.Id(), reconciliationError, now),
1088-
}
1089-
events = append(events, es)
1090-
}
1091-
1092-
return job, events
1093-
}
1094-
10951026
// expireJobsIfNecessary removes any jobs from the JobDb which are running on stale executors.
10961027
// It also generates an EventSequence for each job, indicating that both the run and the job has failed
10971028
// Note that this is different behaviour from the old scheduler which would allow expired jobs to be rerun

0 commit comments

Comments
 (0)