Skip to content

Commit af539bc

Browse files
authored
feat(scheduler): add per-pool scheduling outcome metrics (#4591)
<!-- Thanks for sending a pull request! Here are some tips for you: --> #### What type of PR is this? Enhancement #### What this PR does / why we need it Adds per-pool scheduling metrics to track success/failure outcomes for each pool independently. Currently a scheduling failure in one pool causes the entire cycle to fail with a single error. These metrics enable: 1. Identifying which pool is failing 2. Alerting on specific pool failures 3. Tracking pool health over time **New metrics:** - `armada_scheduler_pool_scheduling_outcome` - counter with labels `pool`, `outcome` (success/failure) #### Which issue(s) this PR fixes <!-- *Automatically closes linked issue when PR is merged. Usage: `Fixes #<issue number>`, or `Fixes (paste link of issue)`. _If PR is about `failing-tests or flakes`, please post the related issues/tests in a comment and do not use `Fixes`_* --> Fixes # #### Special notes for your reviewer Signed-off-by: Dejan Zele Pejchev <pejcev.dejan@gmail.com>
1 parent 56aca92 commit af539bc

File tree

7 files changed

+140
-13
lines changed

7 files changed

+140
-13
lines changed

internal/scheduler/metrics/constants.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ const (
2323
reservationLabel = "reservation"
2424
jobShapeLabel = "job_shape"
2525
unschedulableReasonLabel = "unschedulable_reason"
26+
outcomeLabel = "outcome"
27+
terminationReasonLabel = "termination_reason"
28+
29+
PoolSchedulingOutcomeSuccess = "success"
30+
PoolSchedulingOutcomeFailure = "failure"
2631

2732
// Job state strings
2833
queued = "queued"

internal/scheduler/metrics/cycle_metrics.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ var (
3030
poolAndShapeLabels = []string{poolLabel, jobShapeLabel}
3131
poolAndShapeAndReasonLabels = []string{poolLabel, jobShapeLabel, unschedulableReasonLabel}
3232
poolQueueAndResourceLabels = []string{poolLabel, queueLabel, resourceLabel}
33+
poolAndOutcomeLabels = []string{poolLabel, outcomeLabel, terminationReasonLabel}
3334
defaultType = "unknown"
3435
)
3536

@@ -356,6 +357,7 @@ type cycleMetrics struct {
356357

357358
scheduledJobs *prometheus.CounterVec
358359
premptedJobs *prometheus.CounterVec
360+
poolSchedulingOutcome *prometheus.CounterVec
359361
scheduleCycleTime prometheus.Histogram
360362
reconciliationCycleTime prometheus.Histogram
361363
latestCycleMetrics atomic.Pointer[perCycleMetrics]
@@ -395,10 +397,19 @@ func newCycleMetrics(publisher pulsarutils.Publisher[*metricevents.Event]) *cycl
395397
},
396398
)
397399

400+
poolSchedulingOutcome := prometheus.NewCounterVec(
401+
prometheus.CounterOpts{
402+
Name: prefix + "pool_scheduling_outcome",
403+
Help: "Number of scheduling attempts per pool by outcome",
404+
},
405+
poolAndOutcomeLabels,
406+
)
407+
398408
cycleMetrics := &cycleMetrics{
399409
leaderMetricsEnabled: true,
400410
scheduledJobs: scheduledJobs,
401411
premptedJobs: premptedJobs,
412+
poolSchedulingOutcome: poolSchedulingOutcome,
402413
scheduleCycleTime: scheduleCycleTime,
403414
reconciliationCycleTime: reconciliationCycleTime,
404415
latestCycleMetrics: atomic.Pointer[perCycleMetrics]{},
@@ -420,6 +431,7 @@ func (m *cycleMetrics) disableLeaderMetrics() {
420431
func (m *cycleMetrics) resetLeaderMetrics() {
421432
m.premptedJobs.Reset()
422433
m.scheduledJobs.Reset()
434+
m.poolSchedulingOutcome.Reset()
423435
m.latestCycleMetrics.Store(newPerCycleMetrics())
424436
}
425437

@@ -438,6 +450,18 @@ func (m *cycleMetrics) ReportJobPreemptedWithType(job *jobdb.Job, preemptionType
438450
m.premptedJobs.WithLabelValues(job.LatestRun().Pool(), job.Queue(), job.PriorityClassName(), string(preemptionType)).Inc()
439451
}
440452

453+
// ReportPoolSchedulingOutcomes reports outcomes for all pools from the scheduler result
454+
func (m *cycleMetrics) ReportPoolSchedulingOutcomes(outcomes []scheduling.PoolSchedulingOutcome) {
455+
for _, o := range outcomes {
456+
terminationReason := string(o.TerminationReason)
457+
outcome := PoolSchedulingOutcomeSuccess
458+
if !o.Success {
459+
outcome = PoolSchedulingOutcomeFailure
460+
}
461+
m.poolSchedulingOutcome.WithLabelValues(o.Pool, outcome, terminationReason).Inc()
462+
}
463+
}
464+
441465
func (m *cycleMetrics) ReportSchedulerResult(ctx *armadacontext.Context, result scheduling.SchedulerResult) {
442466
// Metrics that depend on pool
443467
currentCycle := newPerCycleMetrics()
@@ -565,6 +589,7 @@ func (m *cycleMetrics) describe(ch chan<- *prometheus.Desc) {
565589
if m.leaderMetricsEnabled {
566590
m.scheduledJobs.Describe(ch)
567591
m.premptedJobs.Describe(ch)
592+
m.poolSchedulingOutcome.Describe(ch)
568593
m.scheduleCycleTime.Describe(ch)
569594

570595
currentCycle := m.latestCycleMetrics.Load()
@@ -608,6 +633,7 @@ func (m *cycleMetrics) collect(ch chan<- prometheus.Metric) {
608633
if m.leaderMetricsEnabled {
609634
m.scheduledJobs.Collect(ch)
610635
m.premptedJobs.Collect(ch)
636+
m.poolSchedulingOutcome.Collect(ch)
611637
m.scheduleCycleTime.Collect(ch)
612638

613639
currentCycle := m.latestCycleMetrics.Load()

internal/scheduler/metrics/cycle_metrics_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,47 @@ func TestPublishCycleMetrics(t *testing.T) {
311311
m.publishCycleMetrics(ctx, schedulerResult)
312312
}
313313

314+
func TestReportPoolSchedulingOutcomes(t *testing.T) {
315+
m := newCycleMetrics(pulsarutils.NoOpPublisher[*metricevents.Event]{})
316+
317+
outcomes := []scheduling.PoolSchedulingOutcome{
318+
{
319+
Pool: "pool-1",
320+
Success: false,
321+
TerminationReason: scheduling.PoolSchedulingTerminationReasonError,
322+
},
323+
{
324+
Pool: "pool-2",
325+
Success: true,
326+
TerminationReason: scheduling.PoolSchedulingTerminationReasonCompleted,
327+
},
328+
{
329+
Pool: "pool-3",
330+
Success: true,
331+
TerminationReason: scheduling.PoolSchedulingTerminationReasonTimeout,
332+
},
333+
{
334+
Pool: "pool-4",
335+
Success: true,
336+
TerminationReason: scheduling.PoolSchedulingTerminationReasonRateLimit,
337+
},
338+
}
339+
340+
m.ReportPoolSchedulingOutcomes(outcomes)
341+
342+
failureCount := testutil.ToFloat64(m.poolSchedulingOutcome.WithLabelValues("pool-1", PoolSchedulingOutcomeFailure, string(scheduling.PoolSchedulingTerminationReasonError)))
343+
assert.Equal(t, 1.0, failureCount, "failure outcome metric does not match")
344+
345+
completedCount := testutil.ToFloat64(m.poolSchedulingOutcome.WithLabelValues("pool-2", PoolSchedulingOutcomeSuccess, string(scheduling.PoolSchedulingTerminationReasonCompleted)))
346+
assert.Equal(t, 1.0, completedCount, "completed outcome metric does not match")
347+
348+
timeoutCount := testutil.ToFloat64(m.poolSchedulingOutcome.WithLabelValues("pool-3", PoolSchedulingOutcomeSuccess, string(scheduling.PoolSchedulingTerminationReasonTimeout)))
349+
assert.Equal(t, 1.0, timeoutCount, "timeout outcome metric does not match")
350+
351+
rateLimitCount := testutil.ToFloat64(m.poolSchedulingOutcome.WithLabelValues("pool-4", PoolSchedulingOutcomeSuccess, string(scheduling.PoolSchedulingTerminationReasonRateLimit)))
352+
assert.Equal(t, 1.0, rateLimitCount, "rate limit outcome metric does not match")
353+
}
354+
314355
func mustParseResourcePtr(qtyStr string) *resource.Quantity {
315356
q := resource.MustParse(qtyStr)
316357
return &q

internal/scheduler/scheduler.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,9 @@ func (s *Scheduler) Run(ctx *armadacontext.Context) error {
212212
if shouldSchedule {
213213
previousSchedulingRoundEnd = s.clock.Now()
214214
}
215+
216+
s.metrics.ReportPoolSchedulingOutcomes(result.PoolSchedulingOutcomes)
217+
215218
if err != nil {
216219
ctx.Logger().WithStacktrace(err).Error("scheduling cycle failure")
217220
leaderToken = leader.InvalidLeaderToken()
@@ -366,6 +369,8 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke
366369
var result *scheduling.SchedulerResult
367370
result, err = s.schedulingAlgo.Schedule(ctx, resourceUnits, txn)
368371
if err != nil {
372+
// Copy outcomes to the returned result so metrics are reported for pools that succeeded before the failure
373+
overallSchedulerResult.PoolSchedulingOutcomes = result.PoolSchedulingOutcomes
369374
return overallSchedulerResult, err
370375
}
371376

internal/scheduler/scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1870,7 +1870,7 @@ type testSchedulingAlgo struct {
18701870
func (t *testSchedulingAlgo) Schedule(_ *armadacontext.Context, _ map[string]internaltypes.ResourceList, txn *jobdb.Txn) (*scheduling.SchedulerResult, error) {
18711871
t.numberOfScheduleCalls++
18721872
if t.shouldError {
1873-
return nil, errors.New("error scheduling jobs")
1873+
return &scheduling.SchedulerResult{}, errors.New("error scheduling jobs")
18741874
}
18751875
if t.persisted {
18761876
// Exit right away if decisions have already been persisted.

internal/scheduler/scheduling/result.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package scheduling
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
78
"github.com/armadaproject/armada/internal/scheduler/jobdb"
89
"github.com/armadaproject/armada/internal/scheduler/nodedb"
9-
"github.com/armadaproject/armada/internal/scheduler/scheduling/context"
10+
"github.com/armadaproject/armada/internal/scheduler/scheduling/constraints"
11+
schedulercontext "github.com/armadaproject/armada/internal/scheduler/scheduling/context"
1012
)
1113

1214
type QueueStats struct {
@@ -24,6 +26,35 @@ type QueueStats struct {
2426
Time time.Duration
2527
}
2628

29+
type PoolSchedulingTerminationReason string
30+
31+
const (
32+
PoolSchedulingTerminationReasonCompleted PoolSchedulingTerminationReason = "completed"
33+
PoolSchedulingTerminationReasonTimeout PoolSchedulingTerminationReason = "timeout"
34+
PoolSchedulingTerminationReasonRateLimit PoolSchedulingTerminationReason = "rate_limit"
35+
PoolSchedulingTerminationReasonMaxResources PoolSchedulingTerminationReason = "max_resources"
36+
PoolSchedulingTerminationReasonError PoolSchedulingTerminationReason = "error"
37+
)
38+
39+
func terminationReasonFromString(reason string) PoolSchedulingTerminationReason {
40+
switch reason {
41+
case context.Canceled.Error(), context.DeadlineExceeded.Error():
42+
return PoolSchedulingTerminationReasonTimeout
43+
case constraints.GlobalRateLimitExceededUnschedulableReason:
44+
return PoolSchedulingTerminationReasonRateLimit
45+
case constraints.MaximumResourcesScheduledUnschedulableReason:
46+
return PoolSchedulingTerminationReasonMaxResources
47+
default:
48+
return PoolSchedulingTerminationReasonCompleted
49+
}
50+
}
51+
52+
type PoolSchedulingOutcome struct {
53+
Pool string
54+
Success bool
55+
TerminationReason PoolSchedulingTerminationReason
56+
}
57+
2758
type PerPoolSchedulingStats struct {
2859
// scheduling stats per queue
2960
StatsPerQueue map[string]QueueStats
@@ -36,25 +67,27 @@ type PerPoolSchedulingStats struct {
3667
// The nodeDb used in the scheduling round
3768
NodeDb *nodedb.NodeDb
3869
// The jobs scheduled in this cycle
39-
ScheduledJobs []*context.JobSchedulingContext
70+
ScheduledJobs []*schedulercontext.JobSchedulingContext
4071
// The jobs preempted in this cycle
41-
PreemptedJobs []*context.JobSchedulingContext
72+
PreemptedJobs []*schedulercontext.JobSchedulingContext
4273
// Scheduling summary for gang shapes we're interested in. Prices are determined if the job is deemed schedulable.
4374
MarketDrivenIndicativePrices IndicativeGangPricesByJobShape
4475
}
4576

4677
// SchedulerResult is returned by Rescheduler.Schedule().
4778
type SchedulerResult struct {
4879
// Running jobs that should be preempted.
49-
PreemptedJobs []*context.JobSchedulingContext
80+
PreemptedJobs []*schedulercontext.JobSchedulingContext
5081
// Queued jobs that should be scheduled.
51-
ScheduledJobs []*context.JobSchedulingContext
82+
ScheduledJobs []*schedulercontext.JobSchedulingContext
5283
// Each result may bundle the result of several scheduling decisions.
5384
// These are the corresponding scheduling contexts.
5485
// TODO: This doesn't seem like the right approach.
55-
SchedulingContexts []*context.SchedulingContext
86+
SchedulingContexts []*schedulercontext.SchedulingContext
5687
// scheduling stats
5788
PerPoolSchedulingStats map[string]PerPoolSchedulingStats
89+
// Pool scheduling outcomes for metrics reporting
90+
PoolSchedulingOutcomes []PoolSchedulingOutcome
5891
}
5992

6093
// PreemptedJobsFromSchedulerResult returns the slice of preempted jobs in the result.

internal/scheduler/scheduling/scheduling_algo.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ func NewFairSchedulingAlgo(
9797
// It iterates over each executor in turn (using lexicographical order) and assigns the jobs using a LegacyScheduler, before moving onto the next executor.
9898
// It maintains state of which executors it has considered already and may take multiple Schedule() calls to consider all executors if scheduling is slow.
9999
// Newly leased jobs are updated as such in the jobDb using the transaction provided and are also returned to the caller.
100+
//
101+
// This function must always return a non-nil SchedulerResult, even when returning an error.
102+
// The result contains PoolSchedulingOutcomes that track which pools succeeded or failed,
103+
// and callers depend on this for metrics reporting.
100104
func (l *FairSchedulingAlgo) Schedule(
101105
ctx *armadacontext.Context,
102106
resourceUnits map[string]internaltypes.ResourceList,
@@ -134,7 +138,9 @@ func (l *FairSchedulingAlgo) Schedule(
134138

135139
fsctx, err := l.newFairSchedulingAlgoContext(ctx, txn, pool)
136140
if err != nil {
137-
return nil, err
141+
overallSchedulerResult.PoolSchedulingOutcomes = append(overallSchedulerResult.PoolSchedulingOutcomes,
142+
PoolSchedulingOutcome{Pool: pool.Name, Success: false, TerminationReason: PoolSchedulingTerminationReasonError})
143+
return overallSchedulerResult, err
138144
}
139145

140146
if fsctx.nodeDb.NumNodes() <= 0 {
@@ -156,12 +162,15 @@ func (l *FairSchedulingAlgo) Schedule(
156162

157163
ctx.Infof("Scheduled on executor pool %s in %v with error %v", pool.Name, time.Now().Sub(start), err)
158164

159-
if errors.Is(err, context.DeadlineExceeded) {
160-
// We've reached the scheduling time limit;
165+
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
166+
overallSchedulerResult.PoolSchedulingOutcomes = append(overallSchedulerResult.PoolSchedulingOutcomes,
167+
PoolSchedulingOutcome{Pool: pool.Name, Success: true, TerminationReason: PoolSchedulingTerminationReasonTimeout})
161168
ctx.Info("stopped scheduling early as we have hit the maximum scheduling duration")
162169
break
163170
} else if err != nil {
164-
return nil, err
171+
overallSchedulerResult.PoolSchedulingOutcomes = append(overallSchedulerResult.PoolSchedulingOutcomes,
172+
PoolSchedulingOutcome{Pool: pool.Name, Success: false, TerminationReason: PoolSchedulingTerminationReasonError})
173+
return overallSchedulerResult, err
165174
}
166175
if l.schedulingContextRepository != nil {
167176
l.schedulingContextRepository.StoreSchedulingContext(sctx)
@@ -170,13 +179,21 @@ func (l *FairSchedulingAlgo) Schedule(
170179
preemptedJobs := PreemptedJobsFromSchedulerResult(schedulerResult)
171180
scheduledJobs := ScheduledJobsFromSchedulerResult(schedulerResult)
172181

182+
terminationReason := terminationReasonFromString(sctx.TerminationReason)
173183
if err := txn.Upsert(preemptedJobs); err != nil {
174-
return nil, err
184+
overallSchedulerResult.PoolSchedulingOutcomes = append(overallSchedulerResult.PoolSchedulingOutcomes,
185+
PoolSchedulingOutcome{Pool: pool.Name, Success: false, TerminationReason: PoolSchedulingTerminationReasonError})
186+
return overallSchedulerResult, err
175187
}
176188
if err := txn.Upsert(scheduledJobs); err != nil {
177-
return nil, err
189+
overallSchedulerResult.PoolSchedulingOutcomes = append(overallSchedulerResult.PoolSchedulingOutcomes,
190+
PoolSchedulingOutcome{Pool: pool.Name, Success: false, TerminationReason: PoolSchedulingTerminationReasonError})
191+
return overallSchedulerResult, err
178192
}
179193

194+
overallSchedulerResult.PoolSchedulingOutcomes = append(overallSchedulerResult.PoolSchedulingOutcomes,
195+
PoolSchedulingOutcome{Pool: pool.Name, Success: true, TerminationReason: terminationReason})
196+
180197
// Aggregate changes across executors.
181198
overallSchedulerResult.PreemptedJobs = append(overallSchedulerResult.PreemptedJobs, schedulerResult.PreemptedJobs...)
182199
overallSchedulerResult.ScheduledJobs = append(overallSchedulerResult.ScheduledJobs, schedulerResult.ScheduledJobs...)

0 commit comments

Comments
 (0)