Skip to content

Commit 019b662

Browse files
authored
Merge pull request kubernetes#84017 from ahg-g/ahg-csi
Remove CSINode from scheduler cache.
2 parents e1685b5 + a772722 commit 019b662

File tree

12 files changed

+50
-163
lines changed

12 files changed

+50
-163
lines changed

pkg/scheduler/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ go_library(
3030
"//pkg/scheduler/volumebinder:go_default_library",
3131
"//staging/src/k8s.io/api/core/v1:go_default_library",
3232
"//staging/src/k8s.io/api/storage/v1:go_default_library",
33-
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
3433
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
3534
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
3635
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",

pkg/scheduler/algorithm/predicates/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ go_library(
4040
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
4141
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
4242
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
43+
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
4344
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
4445
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
4546
"//staging/src/k8s.io/csi-translation-lib:go_default_library",

pkg/scheduler/algorithm/predicates/predicates.go

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import (
2626
"k8s.io/klog"
2727

2828
v1 "k8s.io/api/core/v1"
29-
storagev1 "k8s.io/api/storage/v1"
30-
storagev1beta1 "k8s.io/api/storage/v1beta1"
29+
storage "k8s.io/api/storage/v1"
30+
v1beta1storage "k8s.io/api/storage/v1beta1"
3131
apierrors "k8s.io/apimachinery/pkg/api/errors"
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3333
"k8s.io/apimachinery/pkg/fields"
@@ -37,6 +37,7 @@ import (
3737
utilfeature "k8s.io/apiserver/pkg/util/feature"
3838
corelisters "k8s.io/client-go/listers/core/v1"
3939
storagelisters "k8s.io/client-go/listers/storage/v1"
40+
v1beta1storagelisters "k8s.io/client-go/listers/storage/v1beta1"
4041
volumehelpers "k8s.io/cloud-provider/volume/helpers"
4142
csilibplugins "k8s.io/csi-translation-lib/plugins"
4243
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
@@ -169,7 +170,19 @@ type NodeInfo interface {
169170

170171
// CSINodeInfo interface represents anything that can get CSINode object from node name.
171172
type CSINodeInfo interface {
172-
GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error)
173+
GetCSINodeInfo(nodeName string) (*v1beta1storage.CSINode, error)
174+
}
175+
176+
var _ CSINodeInfo = &CachedCSINodeInfo{}
177+
178+
// CachedCSINodeInfo implements CSINodeInfoInfo
179+
type CachedCSINodeInfo struct {
180+
v1beta1storagelisters.CSINodeLister
181+
}
182+
183+
// GetCSINodeInfo returns a persistent volume object by PV ID.
184+
func (c *CachedCSINodeInfo) GetCSINodeInfo(nodeName string) (*v1beta1storage.CSINode, error) {
185+
return c.Get(nodeName)
173186
}
174187

175188
// PersistentVolumeInfo interface represents anything that can get persistent volume object by PV ID.
@@ -209,7 +222,7 @@ func (c *CachedPersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace
209222

210223
// StorageClassInfo interface represents anything that can get a storage class object by class name.
211224
type StorageClassInfo interface {
212-
GetStorageClassInfo(className string) (*storagev1.StorageClass, error)
225+
GetStorageClassInfo(className string) (*storage.StorageClass, error)
213226
}
214227

215228
var _ StorageClassInfo = &CachedStorageClassInfo{}
@@ -220,7 +233,7 @@ type CachedStorageClassInfo struct {
220233
}
221234

222235
// GetStorageClassInfo get StorageClass by class name.
223-
func (c *CachedStorageClassInfo) GetStorageClassInfo(className string) (*storagev1.StorageClass, error) {
236+
func (c *CachedStorageClassInfo) GetStorageClassInfo(className string) (*storage.StorageClass, error) {
224237
return c.Get(className)
225238
}
226239

@@ -313,9 +326,9 @@ type VolumeFilter struct {
313326
FilterVolume func(vol *v1.Volume) (id string, relevant bool)
314327
FilterPersistentVolume func(pv *v1.PersistentVolume) (id string, relevant bool)
315328
// MatchProvisioner evaluates if the StorageClass provisioner matches the running predicate
316-
MatchProvisioner func(sc *storagev1.StorageClass) (relevant bool)
329+
MatchProvisioner func(sc *storage.StorageClass) (relevant bool)
317330
// IsMigrated returns a boolean specifying whether the plugin is migrated to a CSI driver
318-
IsMigrated func(csiNode *storagev1beta1.CSINode) bool
331+
IsMigrated func(csiNode *v1beta1storage.CSINode) bool
319332
}
320333

321334
// NewMaxPDVolumeCountPredicate creates a predicate which evaluates whether a pod can fit based on the
@@ -577,14 +590,14 @@ var EBSVolumeFilter = VolumeFilter{
577590
return "", false
578591
},
579592

580-
MatchProvisioner: func(sc *storagev1.StorageClass) (relevant bool) {
593+
MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {
581594
if sc.Provisioner == csilibplugins.AWSEBSInTreePluginName {
582595
return true
583596
}
584597
return false
585598
},
586599

587-
IsMigrated: func(csiNode *storagev1beta1.CSINode) bool {
600+
IsMigrated: func(csiNode *v1beta1storage.CSINode) bool {
588601
return isCSIMigrationOn(csiNode, csilibplugins.AWSEBSInTreePluginName)
589602
},
590603
}
@@ -605,14 +618,14 @@ var GCEPDVolumeFilter = VolumeFilter{
605618
return "", false
606619
},
607620

608-
MatchProvisioner: func(sc *storagev1.StorageClass) (relevant bool) {
621+
MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {
609622
if sc.Provisioner == csilibplugins.GCEPDInTreePluginName {
610623
return true
611624
}
612625
return false
613626
},
614627

615-
IsMigrated: func(csiNode *storagev1beta1.CSINode) bool {
628+
IsMigrated: func(csiNode *v1beta1storage.CSINode) bool {
616629
return isCSIMigrationOn(csiNode, csilibplugins.GCEPDInTreePluginName)
617630
},
618631
}
@@ -633,14 +646,14 @@ var AzureDiskVolumeFilter = VolumeFilter{
633646
return "", false
634647
},
635648

636-
MatchProvisioner: func(sc *storagev1.StorageClass) (relevant bool) {
649+
MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {
637650
if sc.Provisioner == csilibplugins.AzureDiskInTreePluginName {
638651
return true
639652
}
640653
return false
641654
},
642655

643-
IsMigrated: func(csiNode *storagev1beta1.CSINode) bool {
656+
IsMigrated: func(csiNode *v1beta1storage.CSINode) bool {
644657
return isCSIMigrationOn(csiNode, csilibplugins.AzureDiskInTreePluginName)
645658
},
646659
}
@@ -662,14 +675,14 @@ var CinderVolumeFilter = VolumeFilter{
662675
return "", false
663676
},
664677

665-
MatchProvisioner: func(sc *storagev1.StorageClass) (relevant bool) {
678+
MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {
666679
if sc.Provisioner == csilibplugins.CinderInTreePluginName {
667680
return true
668681
}
669682
return false
670683
},
671684

672-
IsMigrated: func(csiNode *storagev1beta1.CSINode) bool {
685+
IsMigrated: func(csiNode *v1beta1storage.CSINode) bool {
673686
return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName)
674687
},
675688
}
@@ -758,7 +771,7 @@ func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta PredicateMetadata, nodeI
758771
if class.VolumeBindingMode == nil {
759772
return false, nil, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", scName)
760773
}
761-
if *class.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
774+
if *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {
762775
// Skip unbound volumes
763776
continue
764777
}

pkg/scheduler/eventhandlers.go

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424

2525
v1 "k8s.io/api/core/v1"
2626
storagev1 "k8s.io/api/storage/v1"
27-
storagev1beta1 "k8s.io/api/storage/v1beta1"
2827
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2928
utilfeature "k8s.io/apiserver/pkg/util/feature"
3029
"k8s.io/client-go/informers"
@@ -159,61 +158,13 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
159158
}
160159

161160
func (sched *Scheduler) onCSINodeAdd(obj interface{}) {
162-
csiNode, ok := obj.(*storagev1beta1.CSINode)
163-
if !ok {
164-
klog.Errorf("cannot convert to *storagev1beta1.CSINode: %v", obj)
165-
return
166-
}
167-
168-
if err := sched.SchedulerCache.AddCSINode(csiNode); err != nil {
169-
klog.Errorf("scheduler cache AddCSINode failed: %v", err)
170-
}
171-
172161
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd)
173162
}
174163

175164
func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
176-
oldCSINode, ok := oldObj.(*storagev1beta1.CSINode)
177-
if !ok {
178-
klog.Errorf("cannot convert oldObj to *storagev1beta1.CSINode: %v", oldObj)
179-
return
180-
}
181-
182-
newCSINode, ok := newObj.(*storagev1beta1.CSINode)
183-
if !ok {
184-
klog.Errorf("cannot convert newObj to *storagev1beta1.CSINode: %v", newObj)
185-
return
186-
}
187-
188-
if err := sched.SchedulerCache.UpdateCSINode(oldCSINode, newCSINode); err != nil {
189-
klog.Errorf("scheduler cache UpdateCSINode failed: %v", err)
190-
}
191-
192165
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate)
193166
}
194167

195-
func (sched *Scheduler) onCSINodeDelete(obj interface{}) {
196-
var csiNode *storagev1beta1.CSINode
197-
switch t := obj.(type) {
198-
case *storagev1beta1.CSINode:
199-
csiNode = t
200-
case cache.DeletedFinalStateUnknown:
201-
var ok bool
202-
csiNode, ok = t.Obj.(*storagev1beta1.CSINode)
203-
if !ok {
204-
klog.Errorf("cannot convert to *storagev1beta1.CSINode: %v", t.Obj)
205-
return
206-
}
207-
default:
208-
klog.Errorf("cannot convert to *storagev1beta1.CSINode: %v", t)
209-
return
210-
}
211-
212-
if err := sched.SchedulerCache.RemoveCSINode(csiNode); err != nil {
213-
klog.Errorf("scheduler cache RemoveCSINode failed: %v", err)
214-
}
215-
}
216-
217168
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
218169
if err := sched.SchedulingQueue.Add(obj.(*v1.Pod)); err != nil {
219170
utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
@@ -450,7 +401,6 @@ func AddAllEventHandlers(
450401
cache.ResourceEventHandlerFuncs{
451402
AddFunc: sched.onCSINodeAdd,
452403
UpdateFunc: sched.onCSINodeUpdate,
453-
DeleteFunc: sched.onCSINodeDelete,
454404
},
455405
)
456406
}

pkg/scheduler/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@ func (c *Configurator) getAlgorithmArgs() (*PluginFactoryArgs, *plugins.ConfigPr
594594
StatefulSetLister: c.statefulSetLister,
595595
PDBLister: c.pdbLister,
596596
NodeInfo: c.schedulerCache,
597-
CSINodeInfo: c.schedulerCache,
597+
CSINodeInfo: &predicates.CachedCSINodeInfo{CSINodeLister: c.csiNodeLister},
598598
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister},
599599
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: c.pVCLister},
600600
StorageClassInfo: &predicates.CachedStorageClassInfo{StorageClassLister: c.storageClassLister},

pkg/scheduler/framework/plugins/default_registry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func NewDefaultRegistry(args *RegistryArgs) framework.Registry {
7070
},
7171
volumerestrictions.Name: volumerestrictions.New,
7272
volumezone.Name: volumezone.New,
73-
nodevolumelimits.Name: nodevolumelimits.New(args.SchedulerCache),
73+
nodevolumelimits.Name: nodevolumelimits.New,
7474
interpodaffinity.Name: func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
7575
return interpodaffinity.New(args.SchedulerCache, args.SchedulerCache), nil
7676
},

pkg/scheduler/framework/plugins/nodevolumelimits/node_volume_limits.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,23 @@ func (pl *NodeVolumeLimits) Filter(ctx context.Context, _ *framework.CycleState,
4949
return migration.PredicateResultToFrameworkStatus(reasons, err)
5050
}
5151

52-
// New returns function that initializes a new plugin and returns it.
53-
func New(csiNodeInfo predicates.CSINodeInfo) framework.PluginFactory {
54-
return func(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
55-
informerFactory := handle.SharedInformerFactory()
56-
pvInfo := &predicates.CachedPersistentVolumeInfo{
57-
PersistentVolumeLister: informerFactory.Core().V1().PersistentVolumes().Lister(),
58-
}
59-
pvcInfo := &predicates.CachedPersistentVolumeClaimInfo{
60-
PersistentVolumeClaimLister: informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
61-
}
62-
classInfo := &predicates.CachedStorageClassInfo{
63-
StorageClassLister: informerFactory.Storage().V1().StorageClasses().Lister(),
64-
}
65-
return &NodeVolumeLimits{
66-
predicate: predicates.NewCSIMaxVolumeLimitPredicate(csiNodeInfo, pvInfo, pvcInfo, classInfo),
67-
}, nil
52+
// New initializes a new plugin and returns it.
53+
func New(_ *runtime.Unknown, handle framework.FrameworkHandle) (framework.Plugin, error) {
54+
informerFactory := handle.SharedInformerFactory()
55+
csiNodeInfo := &predicates.CachedCSINodeInfo{
56+
CSINodeLister: informerFactory.Storage().V1beta1().CSINodes().Lister(),
6857
}
58+
pvInfo := &predicates.CachedPersistentVolumeInfo{
59+
PersistentVolumeLister: informerFactory.Core().V1().PersistentVolumes().Lister(),
60+
}
61+
pvcInfo := &predicates.CachedPersistentVolumeClaimInfo{
62+
PersistentVolumeClaimLister: informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
63+
}
64+
classInfo := &predicates.CachedStorageClassInfo{
65+
StorageClassLister: informerFactory.Storage().V1().StorageClasses().Lister(),
66+
}
67+
68+
return &NodeVolumeLimits{
69+
predicate: predicates.NewCSIMaxVolumeLimitPredicate(csiNodeInfo, pvInfo, pvcInfo, classInfo),
70+
}, nil
6971
}

pkg/scheduler/internal/cache/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ go_library(
1515
"//pkg/scheduler/nodeinfo:go_default_library",
1616
"//pkg/util/node:go_default_library",
1717
"//staging/src/k8s.io/api/core/v1:go_default_library",
18-
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
1918
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
2019
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
2120
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

pkg/scheduler/internal/cache/cache.go

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"time"
2323

2424
v1 "k8s.io/api/core/v1"
25-
storagev1beta1 "k8s.io/api/storage/v1beta1"
2625
"k8s.io/apimachinery/pkg/labels"
2726
"k8s.io/apimachinery/pkg/util/sets"
2827
"k8s.io/apimachinery/pkg/util/wait"
@@ -70,7 +69,6 @@ type schedulerCache struct {
7069
// a map from pod key to podState.
7170
podStates map[string]*podState
7271
nodes map[string]*nodeInfoListItem
73-
csiNodes map[string]*storagev1beta1.CSINode
7472
// headNode points to the most recently updated NodeInfo in "nodes". It is the
7573
// head of the linked list.
7674
headNode *nodeInfoListItem
@@ -110,7 +108,6 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
110108

111109
nodes: make(map[string]*nodeInfoListItem),
112110
nodeTree: newNodeTree(nil),
113-
csiNodes: make(map[string]*storagev1beta1.CSINode),
114111
assumedPods: make(map[string]bool),
115112
podStates: make(map[string]*podState),
116113
imageStates: make(map[string]*imageState),
@@ -578,34 +575,6 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
578575
return nil
579576
}
580577

581-
func (cache *schedulerCache) AddCSINode(csiNode *storagev1beta1.CSINode) error {
582-
cache.mu.Lock()
583-
defer cache.mu.Unlock()
584-
585-
cache.csiNodes[csiNode.Name] = csiNode
586-
return nil
587-
}
588-
589-
func (cache *schedulerCache) UpdateCSINode(oldCSINode, newCSINode *storagev1beta1.CSINode) error {
590-
cache.mu.Lock()
591-
defer cache.mu.Unlock()
592-
593-
cache.csiNodes[newCSINode.Name] = newCSINode
594-
return nil
595-
}
596-
597-
func (cache *schedulerCache) RemoveCSINode(csiNode *storagev1beta1.CSINode) error {
598-
cache.mu.Lock()
599-
defer cache.mu.Unlock()
600-
601-
_, ok := cache.csiNodes[csiNode.Name]
602-
if !ok {
603-
return fmt.Errorf("csinode %v is not found", csiNode.Name)
604-
}
605-
delete(cache.csiNodes, csiNode.Name)
606-
return nil
607-
}
608-
609578
// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
610579
// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
611580
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulernodeinfo.NodeInfo) {
@@ -711,15 +680,3 @@ func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
711680

712681
return n.info.Node(), nil
713682
}
714-
715-
func (cache *schedulerCache) GetCSINodeInfo(nodeName string) (*storagev1beta1.CSINode, error) {
716-
cache.mu.RLock()
717-
defer cache.mu.RUnlock()
718-
719-
n, ok := cache.csiNodes[nodeName]
720-
if !ok {
721-
return nil, fmt.Errorf("error retrieving csinode '%v' from cache", nodeName)
722-
}
723-
724-
return n, nil
725-
}

pkg/scheduler/internal/cache/fake/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ go_library(
1010
"//pkg/scheduler/internal/cache:go_default_library",
1111
"//pkg/scheduler/nodeinfo:go_default_library",
1212
"//staging/src/k8s.io/api/core/v1:go_default_library",
13-
"//staging/src/k8s.io/api/storage/v1beta1:go_default_library",
1413
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
1514
],
1615
)

0 commit comments

Comments
 (0)