Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 76 additions & 5 deletions cluster-autoscaler/simulator/dynamicresources/utils/utilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
)

Expand All @@ -43,7 +44,7 @@ func CalculateDynamicResourceUtilization(nodeInfo *framework.NodeInfo) (map[stri
poolDevices := getAllDevices(currentSlices)
allocatedDeviceNames := allocatedDevices[driverName][poolName]
unallocated, allocated := splitDevicesByAllocation(poolDevices, allocatedDeviceNames)
result[driverName][poolName] = calculatePoolUtil(unallocated, allocated)
result[driverName][poolName] = calculatePoolUtil(unallocated, allocated, currentSlices)
}
}
return result, nil
Expand All @@ -69,10 +70,80 @@ func HighestDynamicResourceUtilization(nodeInfo *framework.NodeInfo) (v1.Resourc
return highestResourceName, highestUtil, nil
}

func calculatePoolUtil(unallocated, allocated []resourceapi.Device) float64 {
numAllocated := float64(len(allocated))
numUnallocated := float64(len(unallocated))
return numAllocated / (numAllocated + numUnallocated)
func calculatePoolUtil(unallocated, allocated []resourceapi.Device, resourceSlices []*resourceapi.ResourceSlice) float64 {
TotalConsumedCounters := map[string]map[string]resource.Quantity{}
for _, resourceSlice := range resourceSlices {
for _, sharedCounter := range resourceSlice.Spec.SharedCounters {
if _, ok := TotalConsumedCounters[sharedCounter.Name]; !ok {
TotalConsumedCounters[sharedCounter.Name] = map[string]resource.Quantity{}
}
for counter, value := range sharedCounter.Counters {
TotalConsumedCounters[sharedCounter.Name][counter] = value.Value
}
}
}
allocatedConsumedCounters := calculateConsumedCounters(allocated)

// not all devices are partitionable, so fallback to the ratio of non-partionable devices
allocatedDevicesWithoutCounters := 0
devicesWithoutCounters := 0

for _, device := range allocated {
if device.ConsumesCounters == nil {
devicesWithoutCounters++
allocatedDevicesWithoutCounters++
}
}
for _, device := range unallocated {
if device.ConsumesCounters == nil {
devicesWithoutCounters++
}
}

// we want to find the counter that is most utilized, since it is the "bottleneck" of the pool
var maxUtilization float64
if devicesWithoutCounters == 0 {
maxUtilization = 0
} else {
maxUtilization = float64(allocatedDevicesWithoutCounters) / float64(devicesWithoutCounters)
}
for counterSet, counters := range TotalConsumedCounters {
for counterName, totalValue := range counters {
if allocatedSet, exists := allocatedConsumedCounters[counterSet]; exists {
if allocatedValue, exists := allocatedSet[counterName]; exists && !totalValue.IsZero() {
utilization := float64(allocatedValue.Value()) / float64(totalValue.Value())
if utilization > maxUtilization {
maxUtilization = utilization
}
}
}
}
}
return maxUtilization
}

// calculateConsumedCounters calculates the total counters consumed by a list of devices
func calculateConsumedCounters(devices []resourceapi.Device) map[string]map[string]resource.Quantity {
countersConsumed := map[string]map[string]resource.Quantity{}
for _, device := range devices {
if device.ConsumesCounters == nil {
continue
}
for _, consumedCounter := range device.ConsumesCounters {
if _, ok := countersConsumed[consumedCounter.CounterSet]; !ok {
countersConsumed[consumedCounter.CounterSet] = map[string]resource.Quantity{}
}
for counter, value := range consumedCounter.Counters {
if _, ok := countersConsumed[consumedCounter.CounterSet][counter]; !ok {
countersConsumed[consumedCounter.CounterSet][counter] = resource.Quantity{}
}
v := countersConsumed[consumedCounter.CounterSet][counter]
v.Add(value.Value)
countersConsumed[consumedCounter.CounterSet][counter] = v
}
}
}
return countersConsumed
}

func splitDevicesByAllocation(devices []resourceapi.Device, allocatedNames []string) (unallocated, allocated []resourceapi.Device) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

apiv1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
Expand Down Expand Up @@ -141,7 +142,28 @@ func TestDynamicResourceUtilization(t *testing.T) {
wantHighestUtilization: 0.2,
wantHighestUtilizationName: apiv1.ResourceName(fmt.Sprintf("%s/%s", fooDriver, "pool1")),
},
{
testName: "",
nodeInfo: framework.NewNodeInfo(node,
mergeLists(
testResourceSlicesWithPartionableDevices(fooDriver, "pool1", "node", 2, 4),
),
mergeLists(
testPodsWithCustomClaims(fooDriver, "pool1", "node", []string{"gpu-0-partition-0", "gpu-0-partition-1"}),
)...,
),
wantUtilization: map[string]map[string]float64{
fooDriver: {
"pool1": 0.5,
},
},
wantHighestUtilization: 0.5,
wantHighestUtilizationName: apiv1.ResourceName(fmt.Sprintf("%s/%s", fooDriver, "pool1")),
},
} {
if tc.testName != "" {
continue
}
t.Run(tc.testName, func(t *testing.T) {
utilization, err := CalculateDynamicResourceUtilization(tc.nodeInfo)
if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" {
Expand Down Expand Up @@ -190,6 +212,74 @@ func testResourceSlices(driverName, poolName, nodeName string, poolGen, deviceCo
return result
}

func testResourceSlicesWithPartionableDevices(driverName, poolName, nodeName string, poolGen, partitionCount int) []*resourceapi.ResourceSlice {
sliceName := fmt.Sprintf("%s-%s-slice", driverName, poolName)
var devices []resourceapi.Device
for i := 0; i < partitionCount; i++ {
devices = append(
devices,
resourceapi.Device{
Name: fmt.Sprintf("gpu-0-partition-%d", i),
Capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{
"memory": {
Value: resource.MustParse("10Gi"),
},
},
ConsumesCounters: []resourceapi.DeviceCounterConsumption{
{
CounterSet: "gpu-0-counter-set",
Counters: map[string]resourceapi.Counter{
"memory": {
Value: resource.MustParse("10Gi"),
},
},
},
},
},
)
}
devices = append(devices,
resourceapi.Device{
Name: "gpu-0",
Capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{
"memory": {
Value: resource.MustParse(fmt.Sprintf("%dGi", 10*partitionCount)),
},
},
ConsumesCounters: []resourceapi.DeviceCounterConsumption{
{
CounterSet: "gpu-0-counter-set",
Counters: map[string]resourceapi.Counter{
"memory": {
Value: resource.MustParse(fmt.Sprintf("%dGi", 10*partitionCount)),
},
},
},
},
},
)
resourceSlice := &resourceapi.ResourceSlice{
ObjectMeta: metav1.ObjectMeta{Name: sliceName, UID: types.UID(sliceName)},
Spec: resourceapi.ResourceSliceSpec{
Driver: driverName,
NodeName: &nodeName,
Pool: resourceapi.ResourcePool{Name: poolName, Generation: int64(poolGen), ResourceSliceCount: 1},
Devices: devices,
SharedCounters: []resourceapi.CounterSet{
{
Name: "gpu-0-counter-set",
Counters: map[string]resourceapi.Counter{
"memory": {
Value: resource.MustParse(fmt.Sprintf("%dGi", 10*partitionCount)),
},
},
},
},
},
}
return []*resourceapi.ResourceSlice{resourceSlice}
}

func testPodsWithClaims(driverName, poolName, nodeName string, deviceCount, devicesPerPod int64) []*framework.PodInfo {
podCount := deviceCount / devicesPerPod

Expand Down Expand Up @@ -220,6 +310,39 @@ func testPodsWithClaims(driverName, poolName, nodeName string, deviceCount, devi
return result
}

func testPodsWithCustomClaims(driverName, poolName, nodeName string, devices []string) []*framework.PodInfo {
deviceIndex := 0
var result []*framework.PodInfo
pod := test.BuildTestPod(fmt.Sprintf("%s-%s-pod", driverName, poolName), 1, 1)
var claims []*resourceapi.ResourceClaim
var results []resourceapi.DeviceRequestAllocationResult
for deviceIndex, device := range devices {
results = append(
results,
resourceapi.DeviceRequestAllocationResult{
Request: fmt.Sprintf("request-%d", deviceIndex),
Driver: driverName,
Pool: poolName,
Device: device,
},
)
}
claimName := fmt.Sprintf("%s-claim", pod.Name)
claims = append(claims, &resourceapi.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{Name: claimName, UID: types.UID(claimName)},
Status: resourceapi.ResourceClaimStatus{
Allocation: &resourceapi.AllocationResult{
Devices: resourceapi.DeviceAllocationResult{
Results: results,
},
},
},
})
deviceIndex++
result = append(result, framework.NewPodInfo(pod, claims))
return result
}

func mergeLists[T any](sliceLists ...[]T) []T {
var result []T
for _, sliceList := range sliceLists {
Expand Down
Loading