diff --git a/apis/config/register.go b/apis/config/register.go index 57a33a0cc..50f38814e 100644 --- a/apis/config/register.go +++ b/apis/config/register.go @@ -45,6 +45,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &NetworkOverheadArgs{}, &SySchedArgs{}, &PeaksArgs{}, + &NodeResourcesFitPlusArgs{}, + &ScarceResourceAvoidanceArgs{}, ) return nil } diff --git a/apis/config/types.go b/apis/config/types.go index d24960833..d5f626149 100644 --- a/apis/config/types.go +++ b/apis/config/types.go @@ -298,3 +298,24 @@ type PowerModel struct { // Power = K0 + K1 * e ^(K2 * x) : where x is utilisation // Idle power of node will be K0 + K1 } + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ScarceResourceAvoidanceArgs defines the parameters for ScarceResourceAvoidance plugin. +type ScarceResourceAvoidanceArgs struct { + metav1.TypeMeta + Resources []v1.ResourceName `json:"resources,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// NodeResourcesFitPlusArgs defines the parameters for NodeResourcesFitPlus plugin. +type NodeResourcesFitPlusArgs struct { + metav1.TypeMeta + Resources map[v1.ResourceName]ResourcesType `json:"resources"` +} + +type ResourcesType struct { + Type schedconfig.ScoringStrategyType `json:"type"` + Weight int64 `json:"weight"` +} diff --git a/apis/config/zz_generated.deepcopy.go b/apis/config/zz_generated.deepcopy.go index 393afe2be..1c1edfe52 100644 --- a/apis/config/zz_generated.deepcopy.go +++ b/apis/config/zz_generated.deepcopy.go @@ -22,6 +22,7 @@ limitations under the License. package config import ( + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" runtime "k8s.io/apimachinery/pkg/runtime" apisconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -452,3 +453,65 @@ func (in *TrimaranSpec) DeepCopy() *TrimaranSpec { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScarceResourceAvoidanceArgs) DeepCopyInto(out *ScarceResourceAvoidanceArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make([]corev1.ResourceName, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScarceResourceAvoidanceArgs. +func (in *ScarceResourceAvoidanceArgs) DeepCopy() *ScarceResourceAvoidanceArgs { + if in == nil { + return nil + } + out := new(ScarceResourceAvoidanceArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ScarceResourceAvoidanceArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeResourcesFitPlusArgs) DeepCopyInto(out *NodeResourcesFitPlusArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make(map[corev1.ResourceName]ResourcesType, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeResourcesFitPlusArgs. +func (in *NodeResourcesFitPlusArgs) DeepCopy() *NodeResourcesFitPlusArgs { + if in == nil { + return nil + } + out := new(NodeResourcesFitPlusArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeResourcesFitPlusArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} diff --git a/cmd/scheduler/main.go b/cmd/scheduler/main.go index 7c915be2c..6807f51c5 100644 --- a/cmd/scheduler/main.go +++ b/cmd/scheduler/main.go @@ -18,6 +18,8 @@ package main import ( "os" + noderesourcesfitplus "sigs.k8s.io/scheduler-plugins/pkg/noderesourcefitplus" + "sigs.k8s.io/scheduler-plugins/pkg/scarceresourceavoidance" "k8s.io/component-base/cli" _ "k8s.io/component-base/metrics/prometheus/clientgo" // for rest client metric registration @@ -64,6 +66,8 @@ func main() { // app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New), app.WithPlugin(podstate.Name, podstate.New), app.WithPlugin(qos.Name, qos.New), + app.WithPlugin(noderesourcesfitplus.Name, noderesourcesfitplus.New), + app.WithPlugin(scarceresourceavoidance.Name, scarceresourceavoidance.New), ) code := cli.Run(command) diff --git a/pkg/noderesourcefitplus/node_resources_fit_plus.go b/pkg/noderesourcefitplus/node_resources_fit_plus.go new file mode 100644 index 000000000..2f435e298 --- /dev/null +++ b/pkg/noderesourcefitplus/node_resources_fit_plus.go @@ -0,0 +1,152 @@ +package noderesourcesfitplus + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/api/v1/resource" + k8sConfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/framework" + plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" + "sigs.k8s.io/scheduler-plugins/apis/config" +) + +const ( + // Name is plugin name + Name = "NodeResourcesFitPlus" +) + +var ( + _ framework.ScorePlugin = &Plugin{} +) + +type Plugin struct { + handle framework.Handle + args *config.NodeResourcesFitPlusArgs +} + +func New(_ context.Context, args runtime.Object, handle framework.Handle) (framework.Plugin, error) { + + nodeResourcesFitPlusArgs, ok := args.(*config.NodeResourcesFitPlusArgs) + + if !ok { + return nil, fmt.Errorf("want args to be of type NodeResourcesFitPlusArgs, got %T", args) + } + + return &Plugin{ + handle: handle, + args: nodeResourcesFitPlusArgs, + }, nil +} + +func (s *Plugin) Name() string { + return Name +} + +func (s *Plugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { + nodeInfo, err := s.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + + var nodeScore int64 + var weightSum int64 + + podRequest, _ := fitsRequest(computePodResourceRequest(p).Resource, nodeInfo) + + for _, requestSourceName := range podRequest { + v, ok := s.args.Resources[requestSourceName] + if !ok { + continue + } + fit, err := noderesources.NewFit(ctx, + &k8sConfig.NodeResourcesFitArgs{ + ScoringStrategy: &k8sConfig.ScoringStrategy{ + Type: v.Type, // MostAllocated or LeastAllocated + Resources: []k8sConfig.ResourceSpec{ + {Name: string(requestSourceName), Weight: 1}, + }, + }, + }, s.handle, plfeature.Features{}) + + if err != nil { + return 0, framework.NewStatus(framework.Error, err.Error()) + } + resourceScore, status := fit.(framework.ScorePlugin).Score(ctx, state, p, nodeName) + if !status.IsSuccess() { + return 0, framework.NewStatus(framework.Error, err.Error()) + } + + nodeScore += resourceScore * v.Weight + weightSum += v.Weight + } + + if weightSum == 0 { + return framework.MaxNodeScore, framework.NewStatus(framework.Success, "") + } + scores := nodeScore / weightSum + + return scores, nil +} + +func (p *Plugin) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +type preFilterState struct { + framework.Resource +} + +func computePodResourceRequest(pod *v1.Pod) *preFilterState { + // pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled + reqs := resource.PodRequests(pod, resource.PodResourcesOptions{}) + result := &preFilterState{} + result.SetMaxResource(reqs) + return result +} + +func fitsRequest(podRequest framework.Resource, nodeInfo *framework.NodeInfo) ([]v1.ResourceName, []v1.ResourceName) { + var podRequestResource []v1.ResourceName + var nodeRequestResource []v1.ResourceName + + if podRequest.MilliCPU > 0 { + podRequestResource = append(podRequestResource, v1.ResourceCPU) + } + + if nodeInfo.Allocatable.MilliCPU > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceCPU) + } + + if podRequest.Memory > 0 { + podRequestResource = append(podRequestResource, v1.ResourceMemory) + } + + if nodeInfo.Allocatable.Memory > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceMemory) + } + + if podRequest.EphemeralStorage > 0 { + podRequestResource = append(podRequestResource, v1.ResourceEphemeralStorage) + } + + if nodeInfo.Allocatable.EphemeralStorage > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceEphemeralStorage) + } + + for rName, rQuant := range podRequest.ScalarResources { + if rQuant > 0 { + podRequestResource = append(podRequestResource, rName) + } + } + + for rName, rQuant := range nodeInfo.Allocatable.ScalarResources { + if rQuant > 0 { + nodeRequestResource = append(nodeRequestResource, rName) + } + } + + return podRequestResource, nodeRequestResource +} diff --git a/pkg/noderesourcefitplus/node_resources_fit_plus_test.go b/pkg/noderesourcefitplus/node_resources_fit_plus_test.go new file mode 100644 index 000000000..88879da6d --- /dev/null +++ b/pkg/noderesourcefitplus/node_resources_fit_plus_test.go @@ -0,0 +1,271 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package noderesourcesfitplus + +import ( + "context" + k8sConfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + clientsetfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + tf "k8s.io/kubernetes/pkg/scheduler/testing/framework" + + "sigs.k8s.io/scheduler-plugins/apis/config" +) + +func TestNodeResourceFitplus(t *testing.T) { + + scheduledPod := v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("16"), + v1.ResourceMemory: resource.MustParse("32Gi"), + "nvidia.com/gpu": resource.MustParse("4"), + }, + }, + }, + }, + } + + tests := []struct { + pod *v1.Pod + pods []*v1.Pod + nodeInfos []*framework.NodeInfo + args config.NodeResourcesFitPlusArgs + wantErr string + expectedList framework.NodeScoreList + name string + }{ + { + pod: &v1.Pod{Spec: scheduledPod}, + nodeInfos: []*framework.NodeInfo{makeNodeInfo("machine1", "96", "512Gi", "2"), makeNodeInfo("machine2", "96", "512Gi", "6")}, + args: config.NodeResourcesFitPlusArgs{ + Resources: map[v1.ResourceName]config.ResourcesType{ + "nvidia.com/gpu": {Type: k8sConfig.MostAllocated, Weight: 2}, + "cpu": {Type: k8sConfig.LeastAllocated, Weight: 1}, + "memory": {Type: k8sConfig.LeastAllocated, Weight: 1}, + }, + }, + expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MinNodeScore}, {Name: "machine2", Score: framework.MinNodeScore}}, + name: "nothing scheduled, nothing requested", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cs := clientsetfake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + podInformer := informerFactory.Core().V1().Pods().Informer() + for _, p := range test.pods { + podInformer.GetStore().Add(p) + } + registeredPlugins := []tf.RegisterPluginFunc{ + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + } + + pods := []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-pod-0", + }, + Spec: v1.PodSpec{ + NodeName: "machine1", + Containers: []v1.Container{ + { + Name: "test-container", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("16"), + v1.ResourceMemory: resource.MustParse("32Gi"), + "nvidia.com/gpu": resource.MustParse("4"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("16"), + v1.ResourceMemory: resource.MustParse("32Gi"), + "nvidia.com/gpu": resource.MustParse("4"), + }, + }, + }, + }, + }, + }, + } + + snapshot := newTestSharedLister(pods, test.nodeInfos) + + fh, err := tf.NewFramework( + ctx, + registeredPlugins, + "default-scheduler", + frameworkruntime.WithClientSet(cs), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithSnapshotSharedLister(snapshot), + ) + if err != nil { + t.Fatalf("fail to create framework: %s", err) + } + + alloc, err := New(ctx, &test.args, fh) + + if len(test.wantErr) != 0 { + if err != nil && test.wantErr != err.Error() { + t.Fatalf("got err %v, want %v", err.Error(), test.wantErr) + } else if err == nil { + t.Fatalf("no error produced, wanted %v", test.wantErr) + } + return + } + + if err != nil && len(test.wantErr) == 0 { + t.Fatalf("failed to initialize plugin NodeResourcesAllocatable, got error: %v", err) + } + cycleState := framework.NewCycleState() + + nodeScore := map[string]int64{} + plugin := alloc.(*Plugin) + for i := range test.nodeInfos { + score, err1 := plugin.Score(context.Background(), cycleState, test.pod, test.nodeInfos[i].Node().Name) + if err1 != nil { + t.Errorf("unexpected error: %v", err1) + } + nodeScore[test.nodeInfos[i].Node().Name] = score + } + if nodeScore["machine1"] <= nodeScore["machine2"] { + t.Fatalf("TestNodeResourceFitplus err") + } + }) + } +} + +func makeNodeInfo(node string, cpu, memory, gpu string) *framework.NodeInfo { + ni := framework.NewNodeInfo() + ni.SetNode(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: node}, + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpu), + v1.ResourceMemory: resource.MustParse(memory), + "nvidia.com/gpu": resource.MustParse(gpu), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpu), + v1.ResourceMemory: resource.MustParse(memory), + "nvidia.com/gpu": resource.MustParse(gpu), + }, + }, + }) + return ni +} + +func makePod(name string, requests v1.ResourceList) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: name, + Resources: v1.ResourceRequirements{ + Requests: requests, + }, + }, + }, + }, + } +} + +var _ framework.SharedLister = &testSharedLister{} + +type testSharedLister struct { + nodes []*v1.Node + nodeInfos []*framework.NodeInfo + nodeInfoMap map[string]*framework.NodeInfo +} + +func newTestSharedLister(pods []*v1.Pod, nodes []*framework.NodeInfo) *testSharedLister { + nodeInfoMap := make(map[string]*framework.NodeInfo) + nodeInfos := make([]*framework.NodeInfo, 0) + nodess := make([]*v1.Node, 0) + for _, pod := range pods { + nodeName := pod.Spec.NodeName + if _, ok := nodeInfoMap[nodeName]; !ok { + nodeInfoMap[nodeName] = framework.NewNodeInfo() + } + nodeInfoMap[nodeName].AddPod(pod) + } + for _, node := range nodes { + nodess = append(nodess, node.Node()) + if _, ok := nodeInfoMap[node.Node().Name]; !ok { + nodeInfoMap[node.Node().Name] = framework.NewNodeInfo() + } + nodeInfoMap[node.Node().Name].SetNode(node.Node()) + } + + for _, v := range nodeInfoMap { + nodeInfos = append(nodeInfos, v) + } + + return &testSharedLister{ + nodes: nodess, + nodeInfos: nodeInfos, + nodeInfoMap: nodeInfoMap, + } +} + +func (f *testSharedLister) StorageInfos() framework.StorageInfoLister { + return f +} + +func (f *testSharedLister) IsPVCUsedByPods(key string) bool { + return false +} + +func (f *testSharedLister) NodeInfos() framework.NodeInfoLister { + return f +} + +func (f *testSharedLister) List() ([]*framework.NodeInfo, error) { + return f.nodeInfos, nil +} + +func (f *testSharedLister) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) Get(nodeName string) (*framework.NodeInfo, error) { + return f.nodeInfoMap[nodeName], nil +} diff --git a/pkg/scarceresourceavoidance/scarce_resource_avoidance.go b/pkg/scarceresourceavoidance/scarce_resource_avoidance.go new file mode 100644 index 000000000..0b4e696e8 --- /dev/null +++ b/pkg/scarceresourceavoidance/scarce_resource_avoidance.go @@ -0,0 +1,158 @@ +package scarceresourceavoidance + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/api/v1/resource" + "k8s.io/kubernetes/pkg/scheduler/framework" + "sigs.k8s.io/scheduler-plugins/apis/config" +) + +const ( + // Name is plugin name + Name = "ScarceResourceAvoidance" +) + +var ( + _ framework.ScorePlugin = &Plugin{} +) + +type Plugin struct { + handle framework.Handle + args *config.ScarceResourceAvoidanceArgs +} + +func New(_ context.Context, args runtime.Object, handle framework.Handle) (framework.Plugin, error) { + + sampleArgs2, ok := args.(*config.ScarceResourceAvoidanceArgs) + + if !ok { + return nil, fmt.Errorf("want args to be of type ResourceTypesArgs, got %T", args) + } + + return &Plugin{ + handle: handle, + args: sampleArgs2, + }, nil +} + +func (s *Plugin) Name() string { + return Name +} + +func (s *Plugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { + nodeInfo, err := s.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + + podRequest := computePodResourceRequest(p) + podRequestResource, nodeAllocatableResource := fitsRequest(podRequest.Resource, nodeInfo) + diffNames := difference(nodeAllocatableResource, podRequestResource) + intersectNames := intersection(diffNames, s.args.Resources) + + if len(diffNames) == 0 || len(intersectNames) == 0 { + return framework.MaxNodeScore, nil + } + scores := resourceTypesScore(int64(len(intersectNames)), int64(len(diffNames))) + + return scores, nil +} + +func intersection(slice1, slice2 []v1.ResourceName) []v1.ResourceName { + m := make(map[v1.ResourceName]struct{}) + var result []v1.ResourceName + + for _, v := range slice2 { + m[v] = struct{}{} + } + + for _, v := range slice1 { + if _, found := m[v]; found { + result = append(result, v) + } + } + + return result +} + +func difference(slice1, slice2 []v1.ResourceName) []v1.ResourceName { + var result []v1.ResourceName + m := make(map[v1.ResourceName]struct{}) + for _, v := range slice2 { + m[v] = struct{}{} + } + + for _, v := range slice1 { + if _, found := m[v]; !found { + result = append(result, v) + } + } + + return result +} + +func (p *Plugin) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +type preFilterState struct { + framework.Resource +} + +func computePodResourceRequest(pod *v1.Pod) *preFilterState { + // pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled + reqs := resource.PodRequests(pod, resource.PodResourcesOptions{}) + result := &preFilterState{} + result.SetMaxResource(reqs) + return result +} + +func fitsRequest(podRequest framework.Resource, nodeInfo *framework.NodeInfo) ([]v1.ResourceName, []v1.ResourceName) { + var podRequestResource []v1.ResourceName + var nodeRequestResource []v1.ResourceName + + if podRequest.MilliCPU > 0 { + podRequestResource = append(podRequestResource, v1.ResourceCPU) + } + + if nodeInfo.Allocatable.MilliCPU > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceCPU) + } + + if podRequest.Memory > 0 { + podRequestResource = append(podRequestResource, v1.ResourceMemory) + } + + if nodeInfo.Allocatable.Memory > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceMemory) + } + + if podRequest.EphemeralStorage > 0 { + podRequestResource = append(podRequestResource, v1.ResourceEphemeralStorage) + } + + if nodeInfo.Allocatable.EphemeralStorage > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceEphemeralStorage) + } + + for rName, rQuant := range podRequest.ScalarResources { + if rQuant > 0 { + podRequestResource = append(podRequestResource, rName) + } + } + + for rName, rQuant := range nodeInfo.Allocatable.ScalarResources { + if rQuant > 0 { + nodeRequestResource = append(nodeRequestResource, rName) + } + } + + return podRequestResource, nodeRequestResource +} +func resourceTypesScore(requestsSourcesNum, allocatablesSourcesNum int64) int64 { + return (allocatablesSourcesNum - requestsSourcesNum) * framework.MaxNodeScore / allocatablesSourcesNum +} diff --git a/pkg/scarceresourceavoidance/scarce_resource_avoidance_test.go b/pkg/scarceresourceavoidance/scarce_resource_avoidance_test.go new file mode 100644 index 000000000..83a7499d4 --- /dev/null +++ b/pkg/scarceresourceavoidance/scarce_resource_avoidance_test.go @@ -0,0 +1,237 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scarceresourceavoidance + +import ( + "context" + clientsetfake "k8s.io/client-go/kubernetes/fake" + tf "k8s.io/kubernetes/pkg/scheduler/testing/framework" + "sigs.k8s.io/scheduler-plugins/apis/config" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" +) + +func TestScarceResourceAvoidance(t *testing.T) { + + scheduledPod := v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("16"), + v1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + }, + } + + tests := []struct { + pod *v1.Pod + pods []*v1.Pod + nodeInfos []*framework.NodeInfo + args config.ScarceResourceAvoidanceArgs + wantErr string + expectedList framework.NodeScoreList + name string + }{ + { + pod: &v1.Pod{Spec: scheduledPod}, + nodeInfos: []*framework.NodeInfo{makeGPUNodeInfo("machine1", "96", "512Gi", "2"), makeNotGPUNodeInfo("machine2", "96", "512Gi")}, + args: config.ScarceResourceAvoidanceArgs{ + Resources: []v1.ResourceName{ + "nvidia.com/gpu", + }, + }, + expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MinNodeScore}, {Name: "machine2", Score: framework.MinNodeScore}}, + name: "nothing scheduled, nothing requested", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cs := clientsetfake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + podInformer := informerFactory.Core().V1().Pods().Informer() + for _, p := range test.pods { + podInformer.GetStore().Add(p) + } + registeredPlugins := []tf.RegisterPluginFunc{ + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + } + + snapshot := newTestSharedLister(nil, test.nodeInfos) + + fh, err := tf.NewFramework( + ctx, + registeredPlugins, + "default-scheduler", + frameworkruntime.WithClientSet(cs), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithSnapshotSharedLister(snapshot), + ) + if err != nil { + t.Fatalf("fail to create framework: %s", err) + } + + alloc, err := New(ctx, &test.args, fh) + + if len(test.wantErr) != 0 { + if err != nil && test.wantErr != err.Error() { + t.Fatalf("got err %v, want %v", err.Error(), test.wantErr) + } else if err == nil { + t.Fatalf("no error produced, wanted %v", test.wantErr) + } + return + } + + if err != nil && len(test.wantErr) == 0 { + t.Fatalf("failed to initialize plugin NodeResourcesAllocatable, got error: %v", err) + } + cycleState := framework.NewCycleState() + + nodeScore := map[string]int64{} + plugin := alloc.(*Plugin) + for i := range test.nodeInfos { + score, err1 := plugin.Score(context.Background(), cycleState, test.pod, test.nodeInfos[i].Node().Name) + if err1 != nil { + t.Errorf("unexpected error: %v", err1) + } + nodeScore[test.nodeInfos[i].Node().Name] = score + } + if nodeScore["machine1"] >= nodeScore["machine2"] { + t.Fatalf("TestNodeResourceFitplus err") + } + }) + } +} + +func makeGPUNodeInfo(node string, cpu, memory, gpu string) *framework.NodeInfo { + ni := framework.NewNodeInfo() + ni.SetNode(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: node}, + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpu), + v1.ResourceMemory: resource.MustParse(memory), + "nvidia.com/gpu": resource.MustParse(gpu), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpu), + v1.ResourceMemory: resource.MustParse(memory), + "nvidia.com/gpu": resource.MustParse(gpu), + }, + }, + }) + return ni +} + +func makeNotGPUNodeInfo(node string, cpu, memory string) *framework.NodeInfo { + ni := framework.NewNodeInfo() + ni.SetNode(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: node}, + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpu), + v1.ResourceMemory: resource.MustParse(memory), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse(cpu), + v1.ResourceMemory: resource.MustParse(memory), + }, + }, + }) + return ni +} + +var _ framework.SharedLister = &testSharedLister{} + +type testSharedLister struct { + nodes []*v1.Node + nodeInfos []*framework.NodeInfo + nodeInfoMap map[string]*framework.NodeInfo +} + +func newTestSharedLister(pods []*v1.Pod, nodes []*framework.NodeInfo) *testSharedLister { + nodeInfoMap := make(map[string]*framework.NodeInfo) + nodeInfos := make([]*framework.NodeInfo, 0) + nodess := make([]*v1.Node, 0) + for _, pod := range pods { + nodeName := pod.Spec.NodeName + if _, ok := nodeInfoMap[nodeName]; !ok { + nodeInfoMap[nodeName] = framework.NewNodeInfo() + } + nodeInfoMap[nodeName].AddPod(pod) + } + for _, node := range nodes { + nodess = append(nodess, node.Node()) + if _, ok := nodeInfoMap[node.Node().Name]; !ok { + nodeInfoMap[node.Node().Name] = framework.NewNodeInfo() + } + nodeInfoMap[node.Node().Name].SetNode(node.Node()) + } + + for _, v := range nodeInfoMap { + nodeInfos = append(nodeInfos, v) + } + + return &testSharedLister{ + nodes: nodess, + nodeInfos: nodeInfos, + nodeInfoMap: nodeInfoMap, + } +} + +func (f *testSharedLister) StorageInfos() framework.StorageInfoLister { + return f +} + +func (f *testSharedLister) IsPVCUsedByPods(key string) bool { + return false +} + +func (f *testSharedLister) NodeInfos() framework.NodeInfoLister { + return f +} + +func (f *testSharedLister) List() ([]*framework.NodeInfo, error) { + return f.nodeInfos, nil +} + +func (f *testSharedLister) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) Get(nodeName string) (*framework.NodeInfo, error) { + return f.nodeInfoMap[nodeName], nil +}