Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewCloudProvider(opts config.AutoscalingOptions, informerFactory informers.
}

rl := context.NewResourceLimiterFromAutoscalingOptions(opts)
klog.Errorf("WIP just1not2 RLCP: %v", *rl)

if opts.CloudProviderName == "" {
// Ideally this would be an error, but several unit tests of the
Expand Down
1 change: 1 addition & 0 deletions cluster-autoscaler/core/scaledown/resource/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (lf *LimitsFinder) customResourcesTotal(context *context.AutoscalingContext
}
}

klog.Errorf("WIP just1not2 CRTLIMITS: %v", result)
return result, nil
}

Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/core/scaleup/resource/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (m *Manager) ResourcesLeft(ctx *context.AutoscalingContext, nodeInfos map[s
var totalResourcesErr error
if cloudprovider.ContainsCustomResources(resourceLimiter.GetResources()) {
totalResources, totalResourcesErr = m.customResourcesTotal(ctx, nodeInfos, nodesFromNotAutoscaledGroups)
klog.Errorf("WIP just1not2 RESOURCELEFT: %v", totalResources)
}

resultScaleUpLimits := make(Limits)
Expand Down Expand Up @@ -133,6 +134,7 @@ func (m *Manager) ResourcesLeft(ctx *context.AutoscalingContext, nodeInfos map[s
} else {
resultScaleUpLimits[resource] = computeBelowMax(totalResources[resource], max)
}
klog.Errorf("WIP just1not2 RESOURCELEFT2: %v", resultScaleUpLimits[resource])
default:
klog.Errorf("Scale up limits defined for unsupported resource '%s'", resource)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/klog/v2"
)

// CustomResourceTarget contains information about targeted custom resources
Expand All @@ -44,5 +45,6 @@ type CustomResourcesProcessor interface {

// NewDefaultCustomResourcesProcessor returns a default instance of CustomResourcesProcessor.
func NewDefaultCustomResourcesProcessor() CustomResourcesProcessor {
return &GpuCustomResourcesProcessor{}
klog.Errorf("WIP: datadog resource processor added")
return &DatadogCustomResourcesProcessor{}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package customresources

import (
"strconv"
"strings"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/klog/v2"
)

const datadogCustomResourceLabelPrefix = "customresource.nodegroups.datadoghq.com/"

// DatadogCustomResourcesProcessor handles regular GPU resources and Datadog custom resources.
// It assumes, that the resources may not become allocatable immediately after the node creation.
// It uses tags to predict the type/count of resources in that case.
type DatadogCustomResourcesProcessor struct {
gpuProcessor GpuCustomResourcesProcessor
}

// FilterOutNodesWithUnreadyResources removes nodes that should have resources, but don't have
// it in allocatable from ready nodes list and updates their status to unready on all nodes list.
// This is a hack/workaround for nodes with resources coming up without finished configuration, resulting
// in resources missing from their allocatable and capacity.
func (p *DatadogCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) {
gpuPatchedAllNodes, gpuReadyNodes := p.gpuProcessor.FilterOutNodesWithUnreadyResources(context, allNodes, readyNodes)

newAllNodes := make([]*apiv1.Node, 0)
newReadyNodes := make([]*apiv1.Node, 0)
nodesWithUnreadyDatadogResources := make(map[string]*apiv1.Node)
for _, node := range gpuReadyNodes {
isReady := true
for customResource, _ := range getDatadogCustomResources(node) {
datadogCustomResource := apiv1.ResourceName(customResource)
allocatable, found := node.Status.Allocatable[datadogCustomResource]
if !found || allocatable.IsZero() {
klog.V(3).Infof("Overriding status of node %v, which seems to have unready custom resource %v",
node.Name, customResource)
isReady = false
}
}
if isReady {
newReadyNodes = append(newReadyNodes, node)
} else {
nodesWithUnreadyDatadogResources[node.Name] = kubernetes.GetUnreadyNodeCopy(node, kubernetes.ResourceUnready)
}
}
// Override any node with unready resource with its "unready" copy
for _, node := range gpuPatchedAllNodes {
if newNode, found := nodesWithUnreadyDatadogResources[node.Name]; found {
newAllNodes = append(newAllNodes, newNode)
} else {
newAllNodes = append(newAllNodes, node)
}
}
return newAllNodes, newReadyNodes
}

// GetNodeResourceTargets returns mapping of resource names to their targets.
// This includes resources which are not yet ready to use and visible in kubernetes.
func (p *DatadogCustomResourcesProcessor) GetNodeResourceTargets(context *context.AutoscalingContext, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) ([]CustomResourceTarget, errors.AutoscalerError) {
targets, err := p.gpuProcessor.GetNodeResourceTargets(context, node, nodeGroup)
if err != nil {
return targets, err
}

klog.Errorf("WIP just1not2 LABELS: %v", node.Labels)
klog.Errorf("WIP just1not2 RESOURCES: %v", getDatadogCustomResources(node))

for customResource, value := range getDatadogCustomResources(node) {
var targetValue int64 = 0

datadogCustomResource := apiv1.ResourceName(customResource)
allocatable, found := node.Status.Allocatable[datadogCustomResource]
if found && !allocatable.IsZero() {
// First try to get the value from allocatable if available on the node
targetValue = allocatable.Value()
} else {
// Otherwise try to deduce the resource value from node labels
intValue, err := strconv.ParseInt(value, 10, 64)
if err != nil {
klog.Errorf("Failed to parse datadog custom resource %v value %v: %v", customResource, value, err)
return targets, errors.NewAutoscalerError(errors.InternalError, "could not parse datadog label custom resource value")
}
targetValue = intValue
}

klog.Errorf("WIP just1not2 CR: %v - %v", customResource, targetValue)
targets = append(targets, CustomResourceTarget{
ResourceType: customResource,
ResourceCount: targetValue,
})
}

return targets, nil
}

// CleanUp cleans up processor's internal structures.
func (p *DatadogCustomResourcesProcessor) CleanUp() {
}

func getDatadogCustomResources(node *apiv1.Node) map[string]string {
customResources := make(map[string]string, 0)
if node != nil {
for label, value := range node.Labels {
if strings.HasPrefix(label, datadogCustomResourceLabelPrefix) {
customResource := strings.ReplaceAll(label[len(datadogCustomResourceLabelPrefix):], "_", "/")
customResources[customResource] = value
}
}
}
return customResources
}
Loading