Skip to content

Commit 3543b5b

Browse files
Add Idealised Value Metric (#4414)
This Pr adds a new metric (idealised_scheduled_value) which represents the "ideal" value of jobs that could be scheduled each scheduling cycle. We calculate this by considering that all (pool) resources are present on one giang node. This frees us from pesky constraints such as "we need to binpack efficiently" and allows us to quickly generate an upper bound for the maximum amount of value that could be scheduled. --------- Signed-off-by: Chris Martin <chris@cmartinit.co.uk> Co-authored-by: Christopher Martin <Chris.Martin@gresearch.co.uk>
1 parent 4f023a9 commit 3543b5b

23 files changed

+1323
-98
lines changed

internal/scheduler/internaltypes/node.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,10 @@ func (node *Node) GetLabels() map[string]string {
249249
return deepCopyLabels(node.labels)
250250
}
251251

252+
func (node *Node) GetRunningJobIds() []string {
253+
return maps.Keys(node.AllocatedByJobId)
254+
}
255+
252256
func (node *Node) GetLabelValue(key string) (string, bool) {
253257
val, ok := node.labels[key]
254258
return val, ok

internal/scheduler/internaltypes/node_factory.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ func (f *NodeFactory) CreateNodeAndType(
6464
labels map[string]string,
6565
totalResources ResourceList,
6666
allocatableResources ResourceList,
67-
unallocatableResources map[int32]ResourceList,
6867
allocatableByPriority map[int32]ResourceList,
6968
) *Node {
7069
return CreateNodeAndType(

internal/scheduler/jobdb/job.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,10 +425,22 @@ func (job *Job) WithBidPrices(bids map[string]pricing.Bid) *Job {
425425
return j
426426
}
427427

428+
// GetBidPrice resolves the current bid price.
429+
// It considers running, non-preemptible jobs to have an effectively infinite price.
428430
func (job *Job) GetBidPrice(pool string) float64 {
429431
if !job.queued && !job.priorityClass.Preemptible {
430432
return pricing.NonPreemptibleRunningPrice
431433
}
434+
return job.getBidPrice(pool)
435+
}
436+
437+
// GetRawBidPrice resolves the current bid price.
438+
// It considers running, non-preemptible jobs to have the simple job running price (i.e. ignores premtibility pricing).
439+
func (job *Job) GetRawBidPrice(pool string) float64 {
440+
return job.getBidPrice(pool)
441+
}
442+
443+
func (job *Job) getBidPrice(pool string) float64 {
432444
bidPrice, present := job.bidPricesPool[pool]
433445
if !present {
434446
return 0

internal/scheduler/metrics/cycle_metrics.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ type perCycleMetrics struct {
6363
nodeAllocatedResource *prometheus.GaugeVec
6464
indicativePrice *prometheus.GaugeVec
6565
indicativePriceSchedulable *prometheus.GaugeVec
66+
idealisedScheduledValue *prometheus.GaugeVec
67+
realisedScheduledValue *prometheus.GaugeVec
6668
}
6769

6870
func newPerCycleMetrics() *perCycleMetrics {
@@ -290,6 +292,22 @@ func newPerCycleMetrics() *perCycleMetrics {
290292
poolAndShapeAndReasonLabels,
291293
)
292294

295+
idealisedScheduledValue := prometheus.NewGaugeVec(
296+
prometheus.GaugeOpts{
297+
Name: prefix + "idealised_scheduled_value",
298+
Help: "Idealised value scheduled per queue",
299+
},
300+
poolAndQueueLabels,
301+
)
302+
303+
realisedScheduledValue := prometheus.NewGaugeVec(
304+
prometheus.GaugeOpts{
305+
Name: prefix + "realised_scheduled_value",
306+
Help: "Realised value scheduled per queue",
307+
},
308+
poolAndQueueLabels,
309+
)
310+
293311
return &perCycleMetrics{
294312
consideredJobs: consideredJobs,
295313
fairShare: fairShare,
@@ -319,6 +337,8 @@ func newPerCycleMetrics() *perCycleMetrics {
319337
nodeAllocatedResource: nodeAllocatedResource,
320338
indicativePrice: indicativePrice,
321339
indicativePriceSchedulable: indicativePriceSchedulable,
340+
idealisedScheduledValue: idealisedScheduledValue,
341+
realisedScheduledValue: realisedScheduledValue,
322342
}
323343
}
324344

@@ -432,6 +452,8 @@ func (m *cycleMetrics) ReportSchedulerResult(ctx *armadacontext.Context, result
432452
currentCycle.shortJobPenalty.WithLabelValues(pool, queue).Set(shortJobPenalty)
433453
currentCycle.queueWeight.WithLabelValues(pool, queue).Set(queueContext.Weight)
434454
currentCycle.rawQueueWeight.WithLabelValues(pool, queue).Set(queueContext.RawWeight)
455+
currentCycle.idealisedScheduledValue.WithLabelValues(pool, queue).Set(queueContext.IdealisedValue)
456+
currentCycle.realisedScheduledValue.WithLabelValues(pool, queue).Set(queueContext.RealisedValue)
435457
for _, r := range queueContext.GetBillableResource().GetResources() {
436458
currentCycle.billableResource.WithLabelValues(pool, queue, r.Name).Set(float64(r.RawValue))
437459
}
@@ -563,6 +585,8 @@ func (m *cycleMetrics) describe(ch chan<- *prometheus.Desc) {
563585
currentCycle.nodeAllocatedResource.Describe(ch)
564586
currentCycle.indicativePrice.Describe(ch)
565587
currentCycle.indicativePriceSchedulable.Describe(ch)
588+
currentCycle.idealisedScheduledValue.Describe(ch)
589+
currentCycle.realisedScheduledValue.Describe(ch)
566590
}
567591

568592
m.reconciliationCycleTime.Describe(ch)
@@ -603,6 +627,8 @@ func (m *cycleMetrics) collect(ch chan<- prometheus.Metric) {
603627
currentCycle.nodeAllocatedResource.Collect(ch)
604628
currentCycle.indicativePrice.Collect(ch)
605629
currentCycle.indicativePriceSchedulable.Collect(ch)
630+
currentCycle.idealisedScheduledValue.Collect(ch)
631+
currentCycle.realisedScheduledValue.Collect(ch)
606632
}
607633

608634
m.reconciliationCycleTime.Collect(ch)

internal/scheduler/pricing/bid_price.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package pricing
33
import (
44
"time"
55

6+
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
7+
68
"github.com/armadaproject/armada/internal/common/armadacontext"
79
"github.com/armadaproject/armada/internal/common/cache"
810
"github.com/armadaproject/armada/pkg/bidstore"
@@ -12,8 +14,8 @@ type BidPriceCache struct {
1214
cache *cache.GenericCache[BidPriceSnapshot]
1315
}
1416

15-
func NewBidPriceCache(client bidstore.BidRetrieverServiceClient, updateFrequency time.Duration) *BidPriceCache {
16-
svc := NewExternalBidPriceService(client)
17+
func NewBidPriceCache(client bidstore.BidRetrieverServiceClient, rlf *internaltypes.ResourceListFactory, updateFrequency time.Duration) *BidPriceCache {
18+
svc := NewExternalBidPriceService(client, rlf)
1719
return &BidPriceCache{
1820
cache: cache.NewGenericCache(svc.GetBidPrices, updateFrequency),
1921
}

internal/scheduler/pricing/bid_price_service_test.go

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,69 @@ import (
66

77
"github.com/stretchr/testify/assert"
88
"github.com/stretchr/testify/require"
9+
"k8s.io/apimachinery/pkg/api/resource"
910

11+
schedulerconfiguration "github.com/armadaproject/armada/internal/scheduler/configuration"
12+
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
1013
"github.com/armadaproject/armada/pkg/bidstore"
1114
)
1215

16+
var testResourceListFactory, _ = internaltypes.NewResourceListFactory(
17+
[]schedulerconfiguration.ResourceType{
18+
{Name: "memory", Resolution: resource.MustParse("1")},
19+
{Name: "cpu", Resolution: resource.MustParse("1m")},
20+
{Name: "nvidia.com/gpu", Resolution: resource.MustParse("1m")},
21+
},
22+
[]schedulerconfiguration.FloatingResourceConfig{},
23+
)
24+
1325
func TestConvert(t *testing.T) {
1426
tests := map[string]struct {
1527
input *bidstore.RetrieveBidsResponse
1628
expected BidPriceSnapshot
1729
}{
1830
"empty input": {
1931
input: &bidstore.RetrieveBidsResponse{},
32+
expected: BidPriceSnapshot{
33+
Bids: make(map[PriceKey]map[string]Bid),
34+
ResourceUnits: map[string]internaltypes.ResourceList{},
35+
},
36+
},
37+
"resourceUnit": {
38+
input: &bidstore.RetrieveBidsResponse{
39+
PoolResourceUnits: map[string]*bidstore.ResourceShape{
40+
"pool1": {
41+
Resources: map[string]*resource.Quantity{
42+
"cpu": mustParsePtr("1"),
43+
"memory": mustParsePtr("1Gi"),
44+
},
45+
},
46+
"pool2": {
47+
Resources: map[string]*resource.Quantity{
48+
"cpu": mustParsePtr("2"),
49+
"memory": mustParsePtr("2Gi"),
50+
"nvidia.com/gpu": mustParsePtr("1"),
51+
},
52+
},
53+
},
54+
},
2055
expected: BidPriceSnapshot{
2156
Bids: make(map[PriceKey]map[string]Bid),
57+
ResourceUnits: map[string]internaltypes.ResourceList{
58+
"pool1": testResourceListFactory.FromNodeProto(
59+
map[string]*resource.Quantity{
60+
"cpu": mustParsePtr("1"),
61+
"memory": mustParsePtr("1Gi"),
62+
},
63+
),
64+
"pool2": testResourceListFactory.FromNodeProto(
65+
map[string]*resource.Quantity{
66+
"cpu": mustParsePtr("2"),
67+
"memory": mustParsePtr("2Gi"),
68+
"nvidia.com/gpu": mustParsePtr("1"),
69+
},
70+
),
71+
},
2272
},
2373
},
2474
"single bid with both phases": {
@@ -55,6 +105,7 @@ func TestConvert(t *testing.T) {
55105
},
56106
},
57107
},
108+
ResourceUnits: map[string]internaltypes.ResourceList{},
58109
},
59110
},
60111
"uses fallback if direct bid is missing": {
@@ -63,7 +114,7 @@ func TestConvert(t *testing.T) {
63114
"queue1": {
64115
PoolBids: map[string]*bidstore.PoolBids{
65116
"pool1": {
66-
FallbackBid: &bidstore.PriceBandBids{
117+
FallbackBids: &bidstore.PriceBandBids{
67118
PricingPhaseBids: []*bidstore.PricingPhaseBid{
68119
CreateQueuedBid(5.0),
69120
CreateRunningBid(10.0),
@@ -158,13 +209,14 @@ func TestConvert(t *testing.T) {
158209
},
159210
},
160211
},
212+
ResourceUnits: map[string]internaltypes.ResourceList{},
161213
},
162214
},
163215
}
164216

165217
for name, tc := range tests {
166218
t.Run(name, func(t *testing.T) {
167-
actual := convert(tc.input)
219+
actual := convert(tc.input, testResourceListFactory)
168220

169221
// Remove non-deterministic time field before comparison
170222
require.False(t, actual.Timestamp.IsZero(), "timestamp should be set")
@@ -192,3 +244,8 @@ func CreateRunningBid(p float64) *bidstore.PricingPhaseBid {
192244
},
193245
}
194246
}
247+
248+
func mustParsePtr(qty string) *resource.Quantity {
249+
q := resource.MustParse(qty)
250+
return &q
251+
}

internal/scheduler/pricing/bid_service.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"time"
55

66
"github.com/armadaproject/armada/internal/common/armadacontext"
7+
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
78
"github.com/armadaproject/armada/internal/scheduler/queue"
89
"github.com/armadaproject/armada/pkg/bidstore"
910
)
@@ -59,11 +60,13 @@ func (b *LocalBidPriceService) GetBidPrices(ctx *armadacontext.Context) (BidPric
5960

6061
type ExternalBidPriceService struct {
6162
client bidstore.BidRetrieverServiceClient
63+
rlf *internaltypes.ResourceListFactory
6264
}
6365

64-
func NewExternalBidPriceService(client bidstore.BidRetrieverServiceClient) *ExternalBidPriceService {
66+
func NewExternalBidPriceService(client bidstore.BidRetrieverServiceClient, rlf *internaltypes.ResourceListFactory) *ExternalBidPriceService {
6567
return &ExternalBidPriceService{
6668
client: client,
69+
rlf: rlf,
6770
}
6871
}
6972

@@ -72,13 +75,18 @@ func (b *ExternalBidPriceService) GetBidPrices(ctx *armadacontext.Context) (BidP
7275
if err != nil {
7376
return BidPriceSnapshot{}, err
7477
}
75-
return convert(resp), nil
78+
return convert(resp, b.rlf), nil
7679
}
7780

78-
func convert(resp *bidstore.RetrieveBidsResponse) BidPriceSnapshot {
81+
func convert(resp *bidstore.RetrieveBidsResponse, rlf *internaltypes.ResourceListFactory) BidPriceSnapshot {
7982
snapshot := BidPriceSnapshot{
80-
Timestamp: time.Now(),
81-
Bids: make(map[PriceKey]map[string]Bid),
83+
Timestamp: time.Now(),
84+
Bids: make(map[PriceKey]map[string]Bid),
85+
ResourceUnits: make(map[string]internaltypes.ResourceList),
86+
}
87+
88+
for pool, resourceShape := range resp.PoolResourceUnits {
89+
snapshot.ResourceUnits[pool] = rlf.FromNodeProto(resourceShape.Resources)
8290
}
8391

8492
for queue, qb := range resp.QueueBids {
@@ -88,7 +96,7 @@ func convert(resp *bidstore.RetrieveBidsResponse) BidPriceSnapshot {
8896

8997
for pool, poolBids := range qb.PoolBids {
9098
bb, _ := poolBids.GetBidsForBand(band)
91-
fallback := poolBids.GetFallbackBid()
99+
fallback := poolBids.GetFallbackBids()
92100

93101
queued, hasQueued := getPrice(bb, fallback, bidstore.PricingPhase_PRICING_PHASE_QUEUEING)
94102
running, hasRunning := getPrice(bb, fallback, bidstore.PricingPhase_PRICING_PHASE_RUNNING)

internal/scheduler/pricing/types.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"time"
55

66
"github.com/armadaproject/armada/internal/common/armadacontext"
7+
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
78
"github.com/armadaproject/armada/pkg/bidstore"
89
)
910

@@ -19,8 +20,9 @@ type PriceKey struct {
1920
}
2021

2122
type BidPriceSnapshot struct {
22-
Timestamp time.Time
23-
Bids map[PriceKey]map[string]Bid
23+
Timestamp time.Time
24+
Bids map[PriceKey]map[string]Bid
25+
ResourceUnits map[string]internaltypes.ResourceList
2426
}
2527

2628
type Bid struct {

internal/scheduler/scheduler.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -325,14 +325,14 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke
325325
// Schedule jobs.
326326
if shouldSchedule {
327327
start := time.Now()
328-
err := s.updateJobPrices(ctx, txn)
328+
resourceUnits, err := s.updateJobPrices(ctx, txn)
329329
if err != nil {
330330
return overallSchedulerResult, err
331331
}
332332
ctx.Logger().Infof("updating job prices in %s", time.Now().Sub(start))
333333

334334
var result *scheduling.SchedulerResult
335-
result, err = s.schedulingAlgo.Schedule(ctx, txn)
335+
result, err = s.schedulingAlgo.Schedule(ctx, resourceUnits, txn)
336336
if err != nil {
337337
return overallSchedulerResult, err
338338
}
@@ -463,16 +463,16 @@ func (s *Scheduler) syncState(ctx *armadacontext.Context, initial bool) ([]*jobd
463463

464464
// TODO - This is highly inefficient and will only work if market driven pools are small
465465
// We should rewrite how bids are stored in an efficient manner
466-
func (s *Scheduler) updateJobPrices(ctx *armadacontext.Context, txn *jobdb.Txn) error {
466+
func (s *Scheduler) updateJobPrices(ctx *armadacontext.Context, txn *jobdb.Txn) (map[string]internaltypes.ResourceList, error) {
467467
if len(s.marketDrivenPools) == 0 {
468-
return nil
468+
return nil, nil
469469
}
470470

471471
jobs := txn.GetAll()
472472
jobsByQueue := map[string]map[bidstore.PriceBand][]*jobdb.Job{}
473473
updatedBids, err := s.bidPriceProvider.GetBidPrices(ctx)
474474
if err != nil {
475-
return err
475+
return nil, err
476476
}
477477

478478
hasMarketDrivenPool := func(j *jobdb.Job) bool {
@@ -512,7 +512,11 @@ func (s *Scheduler) updateJobPrices(ctx *armadacontext.Context, txn *jobdb.Txn)
512512
}
513513
}
514514
ctx.Logger().Infof("updating the prices of %d jobs", len(updatedJobs))
515-
return txn.Upsert(updatedJobs)
515+
err = txn.Upsert(updatedJobs)
516+
if err != nil {
517+
return nil, err
518+
}
519+
return updatedBids.ResourceUnits, nil
516520
}
517521

518522
func (s *Scheduler) createSchedulingInfoWithNodeAntiAffinityForAttemptedRuns(job *jobdb.Job) (*internaltypes.JobSchedulingInfo, error) {

internal/scheduler/scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1763,7 +1763,7 @@ type testSchedulingAlgo struct {
17631763
persisted bool
17641764
}
17651765

1766-
func (t *testSchedulingAlgo) Schedule(_ *armadacontext.Context, txn *jobdb.Txn) (*scheduling.SchedulerResult, error) {
1766+
func (t *testSchedulingAlgo) Schedule(_ *armadacontext.Context, _ map[string]internaltypes.ResourceList, txn *jobdb.Txn) (*scheduling.SchedulerResult, error) {
17671767
t.numberOfScheduleCalls++
17681768
if t.shouldError {
17691769
return nil, errors.New("error scheduling jobs")

0 commit comments

Comments
 (0)