Skip to content

Commit 518972f

Browse files
committed
wip
1 parent 5762b91 commit 518972f

File tree

5 files changed

+39
-74
lines changed

5 files changed

+39
-74
lines changed

internal/common/resource/resource.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,13 @@ func (a ComputeResources) LimitToZero() {
170170
}
171171
}
172172

173+
// Zero zeroes out rl such that all quantities have value 0.
174+
func (a ComputeResources) Zero() {
175+
for key := range a {
176+
a[key] = *resource.NewQuantity(0, resource.BinarySI)
177+
}
178+
}
179+
173180
// IsZero function checks if every value in "a" is zero. If any value is not zero it returns false, if all are zero, it returns true.
174181
func (a ComputeResources) IsZero() bool {
175182
if len(a) == 0 {

internal/scheduler/metrics.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -273,10 +273,10 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
273273

274274
cordonedStatusByCluster := map[string]*clusterCordonedStatus{}
275275
phaseCountByQueue := map[queuePhaseMetricKey]int{}
276-
allocatedResourceByQueue := map[queueMetricKey]schedulerobjects.ResourceList{}
277-
usedResourceByQueue := map[queueMetricKey]schedulerobjects.ResourceList{}
278-
availableResourceByCluster := map[clusterMetricKey]schedulerobjects.ResourceList{}
279-
totalResourceByCluster := map[clusterMetricKey]schedulerobjects.ResourceList{}
276+
allocatedResourceByQueue := map[queueMetricKey]resource.ComputeResources{}
277+
usedResourceByQueue := map[queueMetricKey]resource.ComputeResources{}
278+
availableResourceByCluster := map[clusterMetricKey]resource.ComputeResources{}
279+
totalResourceByCluster := map[clusterMetricKey]resource.ComputeResources{}
280280
schedulableNodeCountByCluster := map[clusterMetricKey]int{}
281281
totalNodeCountByCluster := map[clusterMetricKey]int{}
282282

@@ -356,11 +356,11 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
356356
totalNodeCountByCluster[clusterKey]++
357357

358358
// Add total resources to the home cluster pool
359-
addToResourceListMap(totalResourceByCluster, clusterKey, node.TotalResources)
359+
addToResourceListMap(totalResourceByCluster, clusterKey, node.TotalResources.ToComputeResources())
360360

361361
// Add total resources to the away cluster pool
362362
for _, awayClusterKey := range awayClusterKeys {
363-
addToResourceListMap(totalResourceByCluster, awayClusterKey, node.TotalResources)
363+
addToResourceListMap(totalResourceByCluster, awayClusterKey, node.TotalResources.ToComputeResources())
364364
}
365365

366366
for _, resourceUsageQp := range node.ResourceUsageByQueueAndPool {
@@ -370,7 +370,7 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
370370
queueName: resourceUsageQp.Queue,
371371
nodeType: node.ReportingNodeType,
372372
}
373-
addToResourceListMap(usedResourceByQueue, queueKey, *resourceUsageQp.Resources)
373+
addToResourceListMap(usedResourceByQueue, queueKey, resourceUsageQp.Resources.ToComputeResources())
374374
}
375375

376376
for runId, jobRunState := range node.StateByJobRunId {
@@ -398,14 +398,14 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
398398
priorityClass: job.PriorityClassName(),
399399
nodeType: node.ReportingNodeType,
400400
}
401-
addToResourceListMap(allocatedResourceByQueue, queueKey, jobRequirements)
401+
addToResourceListMap(allocatedResourceByQueue, queueKey, jobRequirements.ToComputeResources())
402402

403403
// If the job is running on its home pool, then remove the resources from all the away pools
404404
if jobPool == nodePool {
405405
schedulerobjects.ResourceListFromV1ResourceList(podRequirements.ResourceRequirements.Requests)
406406
for _, awayClusterKey := range awayClusterKeys {
407-
subtractFromResourceListMap(totalResourceByCluster, awayClusterKey, jobRequirements)
408-
subtractFromResourceListMap(availableResourceByCluster, awayClusterKey, jobRequirements)
407+
subtractFromResourceListMap(totalResourceByCluster, awayClusterKey, jobRequirements.ToComputeResources())
408+
subtractFromResourceListMap(availableResourceByCluster, awayClusterKey, jobRequirements.ToComputeResources())
409409
}
410410
}
411411
}
@@ -415,7 +415,7 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
415415
}
416416

417417
for _, pool := range c.floatingResourceTypes.AllPools() {
418-
totalFloatingResources := schedulerobjects.ResourceList{Resources: c.floatingResourceTypes.GetTotalAvailableForPoolAsMap(pool)}
418+
totalFloatingResources := c.floatingResourceTypes.GetTotalAvailableForPoolAsMap(pool)
419419
clusterKey := clusterMetricKey{
420420
cluster: "floating",
421421
pool: pool,
@@ -430,22 +430,22 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
430430
clusterMetrics = append(clusterMetrics, commonmetrics.NewQueueLeasedPodCount(float64(v), k.cluster, k.pool, k.queueName, k.phase, k.nodeType))
431431
}
432432
for k, r := range allocatedResourceByQueue {
433-
for resourceKey, resourceValue := range r.Resources {
433+
for resourceKey, resourceValue := range r {
434434
clusterMetrics = append(clusterMetrics, commonmetrics.NewQueueAllocated(resource.QuantityAsFloat64(resourceValue), k.queueName, k.cluster, k.pool, k.priorityClass, resourceKey, k.nodeType))
435435
}
436436
}
437437
for k, r := range usedResourceByQueue {
438-
for resourceKey, resourceValue := range r.Resources {
438+
for resourceKey, resourceValue := range r {
439439
clusterMetrics = append(clusterMetrics, commonmetrics.NewQueueUsed(resource.QuantityAsFloat64(resourceValue), k.queueName, k.cluster, k.pool, resourceKey, k.nodeType))
440440
}
441441
}
442442
for k, r := range availableResourceByCluster {
443-
for resourceKey, resourceValue := range r.Resources {
443+
for resourceKey, resourceValue := range r {
444444
clusterMetrics = append(clusterMetrics, commonmetrics.NewClusterAvailableCapacity(resource.QuantityAsFloat64(resourceValue), k.cluster, k.pool, resourceKey, k.nodeType))
445445
}
446446
}
447447
for k, r := range totalResourceByCluster {
448-
for resourceKey, resourceValue := range r.Resources {
448+
for resourceKey, resourceValue := range r {
449449
clusterMetrics = append(clusterMetrics, commonmetrics.NewClusterTotalCapacity(resource.QuantityAsFloat64(resourceValue), k.cluster, k.pool, resourceKey, k.nodeType))
450450
}
451451
}
@@ -461,18 +461,18 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
461461
return clusterMetrics, nil
462462
}
463463

464-
func addToResourceListMap[K comparable](m map[K]schedulerobjects.ResourceList, key K, value schedulerobjects.ResourceList) {
464+
func addToResourceListMap[K comparable](m map[K]resource.ComputeResources, key K, value resource.ComputeResources) {
465465
if _, exists := m[key]; !exists {
466-
m[key] = schedulerobjects.ResourceList{}
466+
m[key] = resource.ComputeResources{}
467467
}
468468
newValue := m[key]
469469
newValue.Add(value)
470470
m[key] = newValue
471471
}
472472

473-
func subtractFromResourceListMap[K comparable](m map[K]schedulerobjects.ResourceList, key K, value schedulerobjects.ResourceList) {
473+
func subtractFromResourceListMap[K comparable](m map[K]resource.ComputeResources, key K, value resource.ComputeResources) {
474474
if _, exists := m[key]; !exists {
475-
m[key] = schedulerobjects.ResourceList{}
475+
m[key] = resource.ComputeResources{}
476476
}
477477
newValue := m[key]
478478
newValue.Sub(value)
Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package schedulerobjects
22

3-
func (node *Node) AvailableArmadaResource() ResourceList {
4-
tr := node.TotalResources.DeepCopy()
3+
import "github.com/armadaproject/armada/internal/common/resource"
4+
5+
func (node *Node) AvailableArmadaResource() resource.ComputeResources {
6+
cr := node.TotalResources.ToComputeResources()
57
for _, rl := range node.UnallocatableResources {
6-
tr.Sub(rl)
8+
cr.Sub(rl.ToComputeResources())
79
}
8-
return tr
10+
return cr
911
}

internal/scheduler/schedulerobjects/resourcelist.go

Lines changed: 7 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package schedulerobjects
33
import (
44
v1 "k8s.io/api/core/v1"
55
"k8s.io/apimachinery/pkg/api/resource"
6+
7+
resource2 "github.com/armadaproject/armada/internal/common/resource"
68
)
79

810
// Most jobs specify 3 or fewer resources. We add 1 extra for margin.
@@ -38,12 +40,6 @@ func V1ResourceListFromResourceList(rl ResourceList) v1.ResourceList {
3840

3941
type QuantityByTAndResourceType[T comparable] map[T]ResourceList
4042

41-
func (a QuantityByTAndResourceType[T]) AddResourceList(t T, rlb ResourceList) {
42-
rla := a[t]
43-
rla.Add(rlb)
44-
a[t] = rla
45-
}
46-
4743
func (rl *ResourceList) Get(resourceType string) resource.Quantity {
4844
return rl.Resources[resourceType]
4945
}
@@ -62,31 +58,6 @@ func (a *ResourceList) Add(b ResourceList) {
6258
}
6359
}
6460

65-
func (a *ResourceList) AddV1ResourceList(b v1.ResourceList) {
66-
a.initialise()
67-
for t, qb := range b {
68-
qa := a.Resources[string(t)]
69-
qa.Add(qb)
70-
a.Resources[string(t)] = qa
71-
}
72-
}
73-
74-
func (a *ResourceList) SubV1ResourceList(b v1.ResourceList) {
75-
a.initialise()
76-
for t, qb := range b {
77-
qa := a.Resources[string(t)]
78-
qa.Sub(qb)
79-
a.Resources[string(t)] = qa
80-
}
81-
}
82-
83-
func (rl *ResourceList) AddQuantity(resourceType string, quantity resource.Quantity) {
84-
rl.initialise()
85-
q := rl.Resources[resourceType]
86-
q.Add(quantity)
87-
rl.Resources[resourceType] = q
88-
}
89-
9061
func (a *ResourceList) Sub(b ResourceList) {
9162
a.initialise()
9263
for t, qb := range b.Resources {
@@ -109,14 +80,6 @@ func (rl ResourceList) DeepCopy() ResourceList {
10980
return rv
11081
}
11182

112-
// Zero zeroes out rl in-place, such that all quantities have value 0.
113-
func (rl ResourceList) Zero() {
114-
for t, q := range rl.Resources {
115-
q.Set(0)
116-
rl.Resources[t] = q
117-
}
118-
}
119-
12083
func (a ResourceList) Equal(b ResourceList) bool {
12184
for t, qa := range a.Resources {
12285
if qa.Cmp(b.Get(t)) != 0 {
@@ -137,6 +100,11 @@ func (rl *ResourceList) initialise() {
137100
}
138101
}
139102

103+
func (rl ResourceList) ToComputeResources() resource2.ComputeResources {
104+
cpy := rl.DeepCopy()
105+
return cpy.Resources
106+
}
107+
140108
// AllocatableByPriorityAndResourceType accounts for resources that can be allocated to pods of a given priority.
141109
// E.g., AllocatableByPriorityAndResourceType[5]["cpu"] is the amount of CPU available to pods with priority 5,
142110
// where alloctable resources = unused resources + resources allocated to lower-priority pods.

internal/scheduler/schedulerobjects/resourcelist_test.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -119,18 +119,6 @@ func TestResourceListEqual(t *testing.T) {
119119
}
120120
}
121121

122-
func TestResourceListZero(t *testing.T) {
123-
rl := ResourceList{
124-
Resources: map[string]resource.Quantity{
125-
"foo": resource.MustParse("1"),
126-
"bar": resource.MustParse("10Gi"),
127-
"baz": resource.MustParse("0"),
128-
},
129-
}
130-
rl.Zero()
131-
assert.True(t, rl.Equal(ResourceList{}))
132-
}
133-
134122
func TestV1ResourceListConversion(t *testing.T) {
135123
rl := ResourceList{
136124
Resources: map[string]resource.Quantity{

0 commit comments

Comments
 (0)