Skip to content

Commit 0b6f14a

Browse files
committed
update scheduler for capacity based property scheduling
Signed-off-by: Britania Rodriguez Reyes <[email protected]>
1 parent d7d6ab8 commit 0b6f14a

File tree

14 files changed

+651
-10
lines changed

14 files changed

+651
-10
lines changed

apis/protos/azure/compute/v1/vmsizerecommender.pb.go

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apis/protos/azure/compute/v1/vmsizerecommender_grpc.pb.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

charts/hub-agent/templates/deployment.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ spec:
5454
- --cluster-unhealthy-threshold={{ .Values.clusterUnhealthyThreshold }}
5555
- --resource-snapshot-creation-minimum-interval={{ .Values.resourceSnapshotCreationMinimumInterval }}
5656
- --resource-changes-collection-duration={{ .Values.resourceChangesCollectionDuration }}
57+
{{- if .Values.enableComputeService }}
58+
- --compute-service-endpoint={{ .Values.computeServiceEndpoint }}
59+
{{- end }}
5760
ports:
5861
- name: metrics
5962
containerPort: 8080

charts/hub-agent/values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ enableEvictionAPIs: true
5454
enablePprof: true
5555
pprofPort: 6065
5656

57+
enableComputeService: false
58+
computeServiceEndpoint: "http://localhost:8421"
59+
5760
hubAPIQPS: 250
5861
hubAPIBurst: 1000
5962
MaxConcurrentClusterPlacement: 100

cmd/hubagent/options/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ type Options struct {
104104
EnablePprof bool
105105
// PprofPort is the port for pprof profiling.
106106
PprofPort int
107+
// EnableComputeService indicates if the compute service is enabled.
108+
EnableComputeService bool
109+
// ComputeServiceEndpoint is the endpoint for the compute service.
110+
ComputeServiceEndpoint string
107111
// DenyModifyMemberClusterLabels indicates if the member cluster labels cannot be modified by groups (excluding system:masters)
108112
DenyModifyMemberClusterLabels bool
109113
// ResourceSnapshotCreationMinimumInterval is the minimum interval at which resource snapshots could be created.
@@ -131,6 +135,8 @@ func NewOptions() *Options {
131135
EnableResourcePlacement: true,
132136
EnablePprof: false,
133137
PprofPort: 6065,
138+
EnableComputeService: false,
139+
ComputeServiceEndpoint: "http://localhost:8421",
134140
ResourceSnapshotCreationMinimumInterval: 30 * time.Second,
135141
ResourceChangesCollectionDuration: 15 * time.Second,
136142
}
@@ -179,6 +185,8 @@ func (o *Options) AddFlags(flags *flag.FlagSet) {
179185
flags.BoolVar(&o.EnableResourcePlacement, "enable-resource-placement", true, "If set, the agents will watch for the ResourcePlacement APIs.")
180186
flags.BoolVar(&o.EnablePprof, "enable-pprof", false, "If set, the pprof profiling is enabled.")
181187
flags.IntVar(&o.PprofPort, "pprof-port", 6065, "The port for pprof profiling.")
188+
flags.BoolVar(&o.EnableComputeService, "enable-compute-service", false, "If set, the compute service is enabled.")
189+
flags.StringVar(&o.ComputeServiceEndpoint, "compute-service-endpoint", "http://localhost:8421", "The endpoint for the compute service.")
182190
flags.BoolVar(&o.DenyModifyMemberClusterLabels, "deny-modify-member-cluster-labels", false, "If set, users not in the system:masters cannot modify member cluster labels.")
183191
flags.DurationVar(&o.ResourceSnapshotCreationMinimumInterval, "resource-snapshot-creation-minimum-interval", 30*time.Second, "The minimum interval at which resource snapshots could be created.")
184192
flags.DurationVar(&o.ResourceChangesCollectionDuration, "resource-changes-collection-duration", 15*time.Second,

cmd/hubagent/workload/setup.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ import (
2222
"strings"
2323
"sync"
2424

25+
"go.goms.io/fleet/pkg/clients/azure/compute"
26+
"go.goms.io/fleet/pkg/clients/httputil"
27+
"go.goms.io/fleet/pkg/scheduler/framework/plugins/clusteraffinity/azure"
28+
2529
"k8s.io/apimachinery/pkg/runtime/schema"
2630
"k8s.io/client-go/discovery"
2731
"k8s.io/client-go/dynamic"
@@ -384,7 +388,17 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
384388

385389
// Set up the scheduler
386390
klog.Info("Setting up scheduler")
387-
defaultProfile := profile.NewDefaultProfile()
391+
profileOpts := profile.ProfileOptions{}
392+
if opts.EnableComputeService {
393+
klog.Info("Azure capacity service is enabled for scheduler")
394+
computeClient, err := compute.NewAttributeBasedVMSizeRecommenderClient(opts.ComputeServiceEndpoint, httputil.DefaultClientForAzure)
395+
if err != nil {
396+
klog.ErrorS(err, "unable to create Azure AttributeBasedVMSizeRecommenderClient")
397+
return err
398+
}
399+
profileOpts.ComputeService = azure.NewAzureCapacityService(*computeClient)
400+
}
401+
defaultProfile := profile.NewDefaultProfileWithOptions(profileOpts)
388402
defaultFramework := framework.NewFramework(defaultProfile, mgr)
389403
defaultSchedulingQueue := queue.NewSimplePlacementSchedulingQueue(
390404
queue.WithName(schedulerQueueName),
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
//
2+
//Copyright (c) Microsoft Corporation.
3+
//Licensed under the MIT license.
4+
5+
// Package azure provides Azure-specific utilities and services.
6+
package azure
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"strings"
12+
13+
"k8s.io/apimachinery/pkg/api/resource"
14+
"k8s.io/klog/v2"
15+
16+
clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
17+
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
18+
computev1 "go.goms.io/fleet/apis/protos/azure/compute/v1"
19+
"go.goms.io/fleet/pkg/clients/azure/compute"
20+
)
21+
22+
// defaultAzureCapacityService is the default implementation of AzureCapacityService.
23+
type defaultAzureCapacityService struct {
24+
// ComputeClient is the Azure compute client used to validate capacity requirements.
25+
ComputeClient compute.AttributeBasedVMSizeRecommenderClient
26+
}
27+
28+
// Compile-time check to ensure defaultAzureCapacityService implements AzureCapacityService
29+
var _ AzureCapacityService = &defaultAzureCapacityService{}
30+
31+
// NewAzureCapacityService creates a new default Azure capacity service
32+
func NewAzureCapacityService(computeClient compute.AttributeBasedVMSizeRecommenderClient) AzureCapacityService {
33+
return &defaultAzureCapacityService{
34+
ComputeClient: computeClient,
35+
}
36+
}
37+
38+
// ValidateCapacityRequirement validates a capacity requirement against Azure APIs.
39+
func (s *defaultAzureCapacityService) ValidateCapacityRequirement(
40+
cluster *clusterv1beta1.MemberCluster,
41+
req placementv1beta1.PropertySelectorRequirement,
42+
) (bool, error) {
43+
subID, location := extractAzureInfoFromLabels(cluster)
44+
if subID == "" || location == "" {
45+
return false, fmt.Errorf("cluster %s does not have required Azure labels", cluster.Name)
46+
}
47+
capacity, sku, err := extractCapacityRequirements(req)
48+
if err != nil {
49+
return false, fmt.Errorf("failed to extract capacity requirement: %w", err)
50+
}
51+
52+
request := &computev1.GenerateAttributeBasedRecommendationsRequest{
53+
SubscriptionId: subID,
54+
Location: location,
55+
RegularPriorityProfile: &computev1.RegularPriorityProfile{
56+
CapacityUnitType: computev1.CapacityUnitType_CAPACITY_UNIT_TYPE_VM_INSTANCE_COUNT,
57+
TargetCapacity: uint32(capacity.Value()), //nolint:gosec // safe: capacity is derived from user input should be relatively small
58+
},
59+
ResourceProperties: &computev1.ResourceProperties{
60+
VmAttributes: &computev1.VMAttributes{
61+
AllowedVmSizes: []string{sku},
62+
},
63+
},
64+
RecommendationProperties: &computev1.RecommendationProperties{
65+
RestrictionsFilter: computev1.RecommendationProperties_RESTRICTIONS_FILTER_QUOTA_AND_OFFER_RESTRICTIONS,
66+
},
67+
}
68+
69+
respObj, err := s.ComputeClient.GenerateAttributeBasedRecommendations(context.Background(), request)
70+
if err != nil {
71+
return false, fmt.Errorf("failed to generate VM size recommendations from Azure: %w", err)
72+
}
73+
74+
available := false
75+
for _, vm := range respObj.RecommendedVmSizes.RegularVmSizes {
76+
if vm.Name == sku {
77+
available = true
78+
klog.V(2).Infof("SKU %s is available in cluster %s", sku, cluster.Name)
79+
break
80+
}
81+
}
82+
83+
return available, nil
84+
}
85+
86+
// extractAzureInfoFromLabels extracts subscription ID and location from MemberCluster labels.
87+
// Returns the subscription ID and location if found, empty strings otherwise.
88+
func extractAzureInfoFromLabels(cluster *clusterv1beta1.MemberCluster) (subscriptionID, location string) {
89+
if cluster == nil || cluster.Labels == nil {
90+
return "", ""
91+
}
92+
93+
subscriptionID = cluster.Labels[AzureSubscriptionIDLabelKey]
94+
location = cluster.Labels[AzureLocationLabelKey]
95+
96+
return subscriptionID, location
97+
}
98+
99+
// extractCapacityRequirements extracts the capacity value from a PropertySelectorRequirement.
100+
// This function is specifically designed for Azure SKU capacity properties that follow the pattern:
101+
// "kubernetes.azure.com/vm-size/{sku}/capacity"
102+
// Returns the capacity as a resource.Quantity and the SKU name if the requirement is valid,
103+
// or an error if the requirement is invalid or not a capacity property.
104+
func extractCapacityRequirements(req placementv1beta1.PropertySelectorRequirement) (*resource.Quantity, string, error) {
105+
// Extract SKU from the property name
106+
// Expected format: "kubernetes.azure.com/vm-size/{sku}/capacity"
107+
if !strings.HasSuffix(req.Name, ".capacity") {
108+
return nil, "", fmt.Errorf("invalid Azure SKU capacity property format: %q", req.Name)
109+
}
110+
111+
// Remove prefix and suffix to get the SKU
112+
sku := strings.TrimSuffix(strings.TrimPrefix(req.Name, SkuCapacityPropertyPrefix+"."), ".capacity")
113+
if sku == "" {
114+
return nil, "", fmt.Errorf("cannot extract SKU from property name: %q", req.Name)
115+
}
116+
117+
// Validate that we have exactly one value
118+
if len(req.Values) != 1 {
119+
return nil, "", fmt.Errorf("azure SKU capacity property must have exactly one value, got %d", len(req.Values))
120+
}
121+
122+
// Parse the capacity value
123+
capacity, err := resource.ParseQuantity(req.Values[0])
124+
if err != nil {
125+
return nil, "", fmt.Errorf("failed to parse capacity value %q: %w", req.Values[0], err)
126+
}
127+
128+
return &capacity, sku, nil
129+
}

0 commit comments

Comments
 (0)