Skip to content

Commit ca5d17b

Browse files
authored
Merge pull request #522 from zwpaper/refactor-ctrl-runtime
refactor: migrate networkoverhead and topologicalsort client to ctrl runtime
2 parents 40c0fee + 7098da7 commit ca5d17b

File tree

9 files changed

+289
-278
lines changed

9 files changed

+289
-278
lines changed

pkg/networkaware/networkoverhead/networkoverhead.go

Lines changed: 86 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,22 @@ import (
2222
"math"
2323
"sort"
2424

25-
v1 "k8s.io/api/core/v1"
25+
corev1 "k8s.io/api/core/v1"
2626
"k8s.io/apimachinery/pkg/labels"
2727
"k8s.io/apimachinery/pkg/runtime"
28+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
2830
corelisters "k8s.io/client-go/listers/core/v1"
2931
"k8s.io/klog/v2"
3032
"k8s.io/kubernetes/pkg/scheduler/framework"
3133

34+
"sigs.k8s.io/controller-runtime/pkg/client"
35+
3236
pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config"
3337
networkawareutil "sigs.k8s.io/scheduler-plugins/pkg/networkaware/util"
3438

3539
agv1alpha1 "github.com/diktyo-io/appgroup-api/pkg/apis/appgroup/v1alpha1"
36-
aglisters "github.com/diktyo-io/appgroup-api/pkg/generated/listers/appgroup/v1alpha1"
3740
ntv1alpha1 "github.com/diktyo-io/networktopology-api/pkg/apis/networktopology/v1alpha1"
38-
ntlisters "github.com/diktyo-io/networktopology-api/pkg/generated/listers/networktopology/v1alpha1"
3941
)
4042

4143
var _ framework.PreFilterPlugin = &NetworkOverhead{}
@@ -61,10 +63,10 @@ const (
6163

6264
// NetworkOverhead : Filter and Score nodes based on Pod's AppGroup requirements: MaxNetworkCosts requirements among Pods with dependencies
6365
type NetworkOverhead struct {
64-
handle framework.Handle
66+
client.Client
67+
6568
podLister corelisters.PodLister
66-
agLister aglisters.AppGroupLister
67-
ntLister ntlisters.NetworkTopologyLister
69+
handle framework.Handle
6870
namespaces []string
6971
weightsName string
7072
ntName string
@@ -136,21 +138,25 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)
136138
return nil, err
137139
}
138140

139-
agLister, err := networkawareutil.InitAppGroupInformer(handle.KubeConfig())
140-
if err != nil {
141-
return nil, err
142-
}
141+
scheme := runtime.NewScheme()
142+
143+
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
143144

144-
ntLister, err := networkawareutil.InitNetworkTopologyInformer(handle.KubeConfig())
145+
utilruntime.Must(agv1alpha1.AddToScheme(scheme))
146+
utilruntime.Must(ntv1alpha1.AddToScheme(scheme))
147+
148+
client, err := client.New(handle.KubeConfig(), client.Options{
149+
Scheme: scheme,
150+
})
145151
if err != nil {
146152
return nil, err
147153
}
148154

149155
no := &NetworkOverhead{
150-
handle: handle,
156+
Client: client,
157+
151158
podLister: handle.SharedInformerFactory().Core().V1().Pods().Lister(),
152-
agLister: agLister,
153-
ntLister: ntLister,
159+
handle: handle,
154160
namespaces: args.Namespaces,
155161
weightsName: args.WeightsName,
156162
ntName: args.NetworkTopologyName,
@@ -165,7 +171,7 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)
165171
// 4. Update cost map of all nodes
166172
// 5. Get number of satisfied and violated dependencies
167173
// 6. Get final cost of the given node to be used in the score plugin
168-
func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
174+
func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) (*framework.PreFilterResult, *framework.Status) {
169175
// Init PreFilter State
170176
preFilterState := &PreFilterState{
171177
scoreEqually: true,
@@ -205,16 +211,15 @@ func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.Cycle
205211
}
206212

207213
// Return if pods are not yet allocated for the AppGroup...
208-
if pods == nil {
214+
if len(pods) == 0 {
209215
return nil, framework.NewStatus(framework.Success, "No pods yet allocated, return")
210216
}
211217

212218
// Pods already scheduled: Get Scheduled List (Deployment name, replicaID, hostname)
213219
scheduledList := networkawareutil.GetScheduledList(pods)
214-
215220
// Check if scheduledList is empty...
216-
if scheduledList == nil {
217-
klog.ErrorS(err, "Scheduled list is empty, return")
221+
if len(scheduledList) == 0 {
222+
klog.ErrorS(nil, "Scheduled list is empty, return")
218223
return nil, framework.NewStatus(framework.Success, "Scheduled list is empty, return")
219224
}
220225

@@ -238,7 +243,10 @@ func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.Cycle
238243
// retrieve region and zone labels
239244
region := networkawareutil.GetNodeRegion(nodeInfo.Node())
240245
zone := networkawareutil.GetNodeZone(nodeInfo.Node())
241-
klog.V(6).InfoS("Node info", "name", nodeInfo.Node().Name, "region", region, "zone", zone)
246+
klog.V(6).InfoS("Node info",
247+
"name", nodeInfo.Node().Name,
248+
"region", region,
249+
"zone", zone)
242250

243251
// Create map for cost / destinations. Search for requirements faster...
244252
costMap := make(map[networkawareutil.CostKey]int64)
@@ -295,18 +303,29 @@ func (no *NetworkOverhead) PreFilterExtensions() framework.PreFilterExtensions {
295303

296304
// AddPod from pre-computed data in cycleState.
297305
// no current need for the NetworkOverhead plugin
298-
func (no *NetworkOverhead) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
306+
func (no *NetworkOverhead) AddPod(ctx context.Context,
307+
cycleState *framework.CycleState,
308+
podToSchedule *corev1.Pod,
309+
podToAdd *framework.PodInfo,
310+
nodeInfo *framework.NodeInfo) *framework.Status {
299311
return framework.NewStatus(framework.Success, "")
300312
}
301313

302314
// RemovePod from pre-computed data in cycleState.
303315
// no current need for the NetworkOverhead plugin
304-
func (no *NetworkOverhead) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
316+
func (no *NetworkOverhead) RemovePod(ctx context.Context,
317+
cycleState *framework.CycleState,
318+
podToSchedule *corev1.Pod,
319+
podToRemove *framework.PodInfo,
320+
nodeInfo *framework.NodeInfo) *framework.Status {
305321
return framework.NewStatus(framework.Success, "")
306322
}
307323

308324
// Filter : evaluate if node can respect maxNetworkCost requirements
309-
func (no *NetworkOverhead) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
325+
func (no *NetworkOverhead) Filter(ctx context.Context,
326+
cycleState *framework.CycleState,
327+
pod *corev1.Pod,
328+
nodeInfo *framework.NodeInfo) *framework.Status {
310329
if nodeInfo.Node() == nil {
311330
return framework.NewStatus(framework.Error, "node not found")
312331
}
@@ -338,7 +357,10 @@ func (no *NetworkOverhead) Filter(ctx context.Context, cycleState *framework.Cyc
338357
}
339358

340359
// Score : evaluate score for a node
341-
func (no *NetworkOverhead) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
360+
func (no *NetworkOverhead) Score(ctx context.Context,
361+
cycleState *framework.CycleState,
362+
pod *corev1.Pod,
363+
nodeName string) (int64, *framework.Status) {
342364
score := framework.MinNodeScore
343365

344366
// Get PreFilterState
@@ -360,7 +382,10 @@ func (no *NetworkOverhead) Score(ctx context.Context, cycleState *framework.Cycl
360382
}
361383

362384
// NormalizeScore : normalize scores since lower scores correspond to lower latency
363-
func (no *NetworkOverhead) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
385+
func (no *NetworkOverhead) NormalizeScore(ctx context.Context,
386+
state *framework.CycleState,
387+
pod *corev1.Pod,
388+
scores framework.NodeScoreList) *framework.Status {
364389
klog.V(4).InfoS("before normalization: ", "scores", scores)
365390

366391
// Get Min and Max Scores to normalize between framework.MaxNodeScore and framework.MinNodeScore
@@ -415,7 +440,11 @@ func (no *NetworkOverhead) sortNetworkTopologyCosts(networkTopology *ntv1alpha1.
415440
}
416441

417442
// populateCostMap : Populates costMap based on the node being filtered/scored
418-
func (no *NetworkOverhead) populateCostMap(costMap map[networkawareutil.CostKey]int64, networkTopology *ntv1alpha1.NetworkTopology, region string, zone string) {
443+
func (no *NetworkOverhead) populateCostMap(
444+
costMap map[networkawareutil.CostKey]int64,
445+
networkTopology *ntv1alpha1.NetworkTopology,
446+
region string,
447+
zone string) {
419448
for _, w := range networkTopology.Spec.Weights { // Check the weights List
420449
if w.Name != no.weightsName { // If it is not the Preferred algorithm, continue
421450
continue
@@ -463,9 +492,14 @@ func (no *NetworkOverhead) populateCostMap(costMap map[networkawareutil.CostKey]
463492
}
464493

465494
// checkMaxNetworkCostRequirements : verifies the number of met and unmet dependencies based on the pod being filtered
466-
func checkMaxNetworkCostRequirements(scheduledList networkawareutil.ScheduledList, dependencyList []agv1alpha1.DependenciesInfo, nodeInfo *framework.NodeInfo, region string,
467-
zone string, costMap map[networkawareutil.CostKey]int64, no *NetworkOverhead) (int64, int64, error) {
468-
495+
func checkMaxNetworkCostRequirements(
496+
scheduledList networkawareutil.ScheduledList,
497+
dependencyList []agv1alpha1.DependenciesInfo,
498+
nodeInfo *framework.NodeInfo,
499+
region string,
500+
zone string,
501+
costMap map[networkawareutil.CostKey]int64,
502+
no *NetworkOverhead) (int64, int64, error) {
469503
var satisfied int64 = 0
470504
var violated int64 = 0
471505

@@ -533,9 +567,13 @@ func checkMaxNetworkCostRequirements(scheduledList networkawareutil.ScheduledLis
533567
}
534568

535569
// getAccumulatedCost : calculate the accumulated cost based on the Pod's dependencies
536-
func (no *NetworkOverhead) getAccumulatedCost(scheduledList networkawareutil.ScheduledList, dependencyList []agv1alpha1.DependenciesInfo, nodeName string, region string,
537-
zone string, costMap map[networkawareutil.CostKey]int64) (int64, error) {
538-
570+
func (no *NetworkOverhead) getAccumulatedCost(
571+
scheduledList networkawareutil.ScheduledList,
572+
dependencyList []agv1alpha1.DependenciesInfo,
573+
nodeName string,
574+
region string,
575+
zone string,
576+
costMap map[networkawareutil.CostKey]int64) (int64, error) {
539577
// keep track of the accumulated cost
540578
var cost int64 = 0
541579

@@ -610,14 +648,18 @@ func getPreFilterState(cycleState *framework.CycleState) (*PreFilterState, error
610648
func (no *NetworkOverhead) findAppGroupNetworkOverhead(agName string) *agv1alpha1.AppGroup {
611649
klog.V(6).InfoS("namespaces: %s", no.namespaces)
612650
for _, namespace := range no.namespaces {
613-
klog.V(6).InfoS("appGroup CR", "namespace", namespace, "ag.lister", no.agLister)
651+
klog.V(6).InfoS("appGroup CR", "namespace", namespace, "name", agName)
614652
// AppGroup could not be placed in several namespaces simultaneously
615-
appGroup, err := no.agLister.AppGroups(namespace).Get(agName)
653+
appGroup := &agv1alpha1.AppGroup{}
654+
err := no.Get(context.TODO(), client.ObjectKey{
655+
Namespace: namespace,
656+
Name: agName,
657+
}, appGroup)
616658
if err != nil {
617-
klog.V(4).InfoS("Cannot get AppGroup from AppGroupNamespaceLister:", "error", err)
659+
klog.V(4).ErrorS(err, "Cannot get AppGroup from AppGroupNamespaceLister:")
618660
continue
619661
}
620-
if appGroup != nil {
662+
if appGroup != nil && appGroup.GetUID() != "" {
621663
return appGroup
622664
}
623665
}
@@ -627,14 +669,18 @@ func (no *NetworkOverhead) findAppGroupNetworkOverhead(agName string) *agv1alpha
627669
func (no *NetworkOverhead) findNetworkTopologyNetworkOverhead() *ntv1alpha1.NetworkTopology {
628670
klog.V(6).InfoS("namespaces: %s", no.namespaces)
629671
for _, namespace := range no.namespaces {
630-
klog.V(6).InfoS("networkTopology CR:", "namespace", namespace, "nt.lister", no.ntLister)
672+
klog.V(6).InfoS("networkTopology CR:", "namespace", namespace, "name", no.ntName)
631673
// NetworkTopology could not be placed in several namespaces simultaneously
632-
networkTopology, err := no.ntLister.NetworkTopologies(namespace).Get(no.ntName)
674+
networkTopology := &ntv1alpha1.NetworkTopology{}
675+
err := no.Get(context.TODO(), client.ObjectKey{
676+
Namespace: namespace,
677+
Name: no.ntName,
678+
}, networkTopology)
633679
if err != nil {
634-
klog.V(4).InfoS("Cannot get networkTopology from networkTopologyNamespaceLister:", "error", err)
680+
klog.V(4).ErrorS(err, "Cannot get networkTopology from networkTopologyNamespaceLister:")
635681
continue
636682
}
637-
if networkTopology != nil {
683+
if networkTopology != nil && networkTopology.GetUID() != "" {
638684
return networkTopology
639685
}
640686
}

0 commit comments

Comments
 (0)