Skip to content

Commit 6ed4997

Browse files
committed
controller: migrate nrt to ctrl runtime
1 parent 0ae911e commit 6ed4997

File tree

8 files changed

+55
-56
lines changed

8 files changed

+55
-56
lines changed

pkg/noderesourcetopology/cache/cache.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package cache
1818

1919
import (
20+
"context"
21+
2022
corev1 "k8s.io/api/core/v1"
2123

2224
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
@@ -30,7 +32,7 @@ type Interface interface {
3032
// The pod argument is used only for logging purposes.
3133
// Returns a boolean to signal the caller if the NRT data is clean. If false, then the node has foreign
3234
// Pods detected - so it should be ignored or handled differently by the caller.
33-
GetCachedNRTCopy(nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool)
35+
GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool)
3436

3537
// NodeMaybeOverReserved declares a node was filtered out for not enough resources available.
3638
// This means this node is eligible for a resync. When a node is marked discarded (dirty), it matters not

pkg/noderesourcetopology/cache/discardreserved.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@ limitations under the License.
1717
package cache
1818

1919
import (
20+
"context"
2021
"sync"
2122

23+
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
24+
2225
corev1 "k8s.io/api/core/v1"
2326
"k8s.io/apimachinery/pkg/types"
2427
"k8s.io/klog/v2"
2528

26-
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
27-
listerv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/listers/topology/v1alpha2"
29+
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
2830
)
2931

3032
// DiscardReserved is intended to solve similiar problem as Overreserve Cache,
@@ -42,17 +44,17 @@ import (
4244
type DiscardReserved struct {
4345
rMutex sync.RWMutex
4446
reservationMap map[string]map[types.UID]bool // Key is NodeName, value is Pod UID : reserved status
45-
lister listerv1alpha2.NodeResourceTopologyLister
47+
client ctrlclient.Client
4648
}
4749

48-
func NewDiscardReserved(lister listerv1alpha2.NodeResourceTopologyLister) Interface {
50+
func NewDiscardReserved(client ctrlclient.Client) Interface {
4951
return &DiscardReserved{
50-
lister: lister,
52+
client: client,
5153
reservationMap: make(map[string]map[types.UID]bool),
5254
}
5355
}
5456

55-
func (pt *DiscardReserved) GetCachedNRTCopy(nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
57+
func (pt *DiscardReserved) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
5658
pt.rMutex.RLock()
5759
defer pt.rMutex.RUnlock()
5860
if t, ok := pt.reservationMap[nodeName]; ok {
@@ -61,8 +63,8 @@ func (pt *DiscardReserved) GetCachedNRTCopy(nodeName string, _ *corev1.Pod) (*to
6163
}
6264
}
6365

64-
nrt, err := pt.lister.Get(nodeName)
65-
if err != nil {
66+
nrt := &topologyv1alpha2.NodeResourceTopology{}
67+
if err := pt.client.Get(ctx, types.NamespacedName{Name: nodeName}, nrt); err != nil {
6668
return nil, false
6769
}
6870
return nrt, true

pkg/noderesourcetopology/cache/overreserve.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,66 +17,68 @@ limitations under the License.
1717
package cache
1818

1919
import (
20+
"context"
2021
"errors"
2122
"fmt"
2223
"sync"
2324
"time"
2425

26+
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
27+
"github.com/k8stopologyawareschedwg/podfingerprint"
28+
2529
corev1 "k8s.io/api/core/v1"
26-
"k8s.io/apimachinery/pkg/labels"
30+
"k8s.io/apimachinery/pkg/types"
2731
podlisterv1 "k8s.io/client-go/listers/core/v1"
2832
k8scache "k8s.io/client-go/tools/cache"
2933
"k8s.io/klog/v2"
3034
"k8s.io/kubernetes/pkg/scheduler/framework"
3135

32-
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
33-
listerv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/listers/topology/v1alpha2"
36+
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
3437

3538
apiconfig "sigs.k8s.io/scheduler-plugins/apis/config"
3639
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify"
37-
38-
"github.com/k8stopologyawareschedwg/podfingerprint"
3940
)
4041

4142
type OverReserve struct {
43+
client ctrlclient.Client
4244
lock sync.Mutex
4345
nrts *nrtStore
4446
assumedResources map[string]*resourceStore // nodeName -> resourceStore
4547
// nodesMaybeOverreserved counts how many times a node is filtered out. This is used as trigger condition to try
4648
// to resync nodes. See The documentation of Resync() below for more details.
4749
nodesMaybeOverreserved counter
4850
nodesWithForeignPods counter
49-
nrtLister listerv1alpha2.NodeResourceTopologyLister
5051
podLister podlisterv1.PodLister
5152
resyncMethod apiconfig.CacheResyncMethod
5253
}
5354

54-
func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, nrtLister listerv1alpha2.NodeResourceTopologyLister, podLister podlisterv1.PodLister) (*OverReserve, error) {
55-
if nrtLister == nil || podLister == nil {
55+
func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient.Client, podLister podlisterv1.PodLister) (*OverReserve, error) {
56+
if client == nil || podLister == nil {
5657
return nil, fmt.Errorf("nrtcache: received nil references")
5758
}
5859

5960
resyncMethod := getCacheResyncMethod(cfg)
6061

61-
nrtObjs, err := nrtLister.List(labels.Everything())
62-
if err != nil {
62+
nrtObjs := &topologyv1alpha2.NodeResourceTopologyList{}
63+
// TODO: we should pass-in a context in the future
64+
if err := client.List(context.Background(), nrtObjs); err != nil {
6365
return nil, err
6466
}
6567

66-
klog.V(3).InfoS("nrtcache: initializing", "objects", len(nrtObjs), "method", resyncMethod)
68+
klog.V(3).InfoS("nrtcache: initializing", "objects", len(nrtObjs.Items), "method", resyncMethod)
6769
obj := &OverReserve{
68-
nrts: newNrtStore(nrtObjs),
70+
client: client,
71+
nrts: newNrtStore(nrtObjs.Items),
6972
assumedResources: make(map[string]*resourceStore),
7073
nodesMaybeOverreserved: newCounter(),
7174
nodesWithForeignPods: newCounter(),
72-
nrtLister: nrtLister,
7375
podLister: podLister,
7476
resyncMethod: resyncMethod,
7577
}
7678
return obj, nil
7779
}
7880

79-
func (ov *OverReserve) GetCachedNRTCopy(nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
81+
func (ov *OverReserve) GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
8082
ov.lock.Lock()
8183
defer ov.lock.Unlock()
8284
if ov.nodesWithForeignPods.IsSet(nodeName) {
@@ -208,8 +210,8 @@ func (ov *OverReserve) Resync() {
208210

209211
var nrtUpdates []*topologyv1alpha2.NodeResourceTopology
210212
for _, nodeName := range nodeNames {
211-
nrtCandidate, err := ov.nrtLister.Get(nodeName)
212-
if err != nil {
213+
nrtCandidate := &topologyv1alpha2.NodeResourceTopology{}
214+
if err := ov.client.Get(context.Background(), types.NamespacedName{Name: nodeName}, nrtCandidate); err != nil {
213215
klog.V(3).InfoS("nrtcache: failed to get NodeTopology", "logID", logID, "node", nodeName, "error", err)
214216
continue
215217
}

pkg/noderesourcetopology/cache/passthrough.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,31 @@ limitations under the License.
1717
package cache
1818

1919
import (
20+
"context"
21+
22+
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
23+
2024
corev1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/types"
2126
"k8s.io/klog/v2"
2227

23-
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
24-
listerv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/listers/topology/v1alpha2"
28+
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
2529
)
2630

2731
type Passthrough struct {
28-
lister listerv1alpha2.NodeResourceTopologyLister
32+
client ctrlclient.Client
2933
}
3034

31-
func NewPassthrough(lister listerv1alpha2.NodeResourceTopologyLister) Interface {
35+
func NewPassthrough(client ctrlclient.Client) Interface {
3236
return Passthrough{
33-
lister: lister,
37+
client: client,
3438
}
3539
}
3640

37-
func (pt Passthrough) GetCachedNRTCopy(nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
38-
klog.V(5).InfoS("Lister for nodeResTopoPlugin", "lister", pt.lister)
39-
nrt, err := pt.lister.Get(nodeName)
40-
if err != nil {
41+
func (pt Passthrough) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
42+
klog.V(5).InfoS("Lister for nodeResTopoPlugin")
43+
nrt := &topologyv1alpha2.NodeResourceTopology{}
44+
if err := pt.client.Get(ctx, types.NamespacedName{Name: nodeName}, nrt); err != nil {
4145
klog.V(5).ErrorS(err, "Cannot get NodeTopologies from NodeResourceTopologyLister")
4246
return nil, true
4347
}

pkg/noderesourcetopology/cache/store.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,12 @@ import (
2727

2828
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
2929
topologyv1alpha2attr "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2/helper/attribute"
30+
"github.com/k8stopologyawareschedwg/podfingerprint"
3031

3132
apiconfig "sigs.k8s.io/scheduler-plugins/apis/config"
3233
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests"
3334
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify"
3435
"sigs.k8s.io/scheduler-plugins/pkg/util"
35-
36-
"github.com/k8stopologyawareschedwg/podfingerprint"
3736
)
3837

3938
// nrtStore maps the NRT data by node name. It is not thread safe and needs to be protected by a lock.
@@ -43,7 +42,7 @@ type nrtStore struct {
4342
}
4443

4544
// newNrtStore creates a new nrtStore and initializes it with copies of the provided Node Resource Topology data.
46-
func newNrtStore(nrts []*topologyv1alpha2.NodeResourceTopology) *nrtStore {
45+
func newNrtStore(nrts []topologyv1alpha2.NodeResourceTopology) *nrtStore {
4746
data := make(map[string]*topologyv1alpha2.NodeResourceTopology, len(nrts))
4847
for _, nrt := range nrts {
4948
data[nrt.Name] = nrt.DeepCopy()

pkg/noderesourcetopology/filter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.Cycle
204204
}
205205

206206
nodeName := nodeInfo.Node().Name
207-
nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(nodeName, pod)
207+
nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)
208208
if !ok {
209209
klog.V(2).InfoS("invalid topology data", "node", nodeName)
210210
return framework.NewStatus(framework.Unschedulable, "invalid node topology data")

pkg/noderesourcetopology/pluginhelpers.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package noderesourcetopology
1818

1919
import (
20-
"context"
2120
"fmt"
2221
"strconv"
2322
"strings"
@@ -30,8 +29,8 @@ import (
3029
"k8s.io/kubernetes/pkg/scheduler/framework"
3130

3231
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
33-
topoclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
34-
topologyinformers "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/informers/externalversions"
32+
33+
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
3534

3635
apiconfig "sigs.k8s.io/scheduler-plugins/apis/config"
3736
nrtcache "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/cache"
@@ -43,32 +42,23 @@ const (
4342
)
4443

4544
func initNodeTopologyInformer(tcfg *apiconfig.NodeResourceTopologyMatchArgs, handle framework.Handle) (nrtcache.Interface, error) {
46-
topoClient, err := topoclientset.NewForConfig(handle.KubeConfig())
45+
client, err := ctrlclient.New(handle.KubeConfig(), ctrlclient.Options{})
4746
if err != nil {
48-
klog.ErrorS(err, "Cannot create clientset for NodeTopologyResource", "kubeConfig", handle.KubeConfig())
47+
klog.ErrorS(err, "Cannot create client for NodeTopologyResource", "kubeConfig", handle.KubeConfig())
4948
return nil, err
5049
}
5150

52-
topologyInformerFactory := topologyinformers.NewSharedInformerFactory(topoClient, 0)
53-
nodeTopologyInformer := topologyInformerFactory.Topology().V1alpha2().NodeResourceTopologies()
54-
nodeTopologyLister := nodeTopologyInformer.Lister()
55-
56-
klog.V(5).InfoS("Start nodeTopologyInformer")
57-
ctx := context.Background()
58-
topologyInformerFactory.Start(ctx.Done())
59-
topologyInformerFactory.WaitForCacheSync(ctx.Done())
60-
6151
if tcfg.DiscardReservedNodes {
62-
return nrtcache.NewDiscardReserved(nodeTopologyLister), nil
52+
return nrtcache.NewDiscardReserved(client), nil
6353
}
6454

6555
if tcfg.CacheResyncPeriodSeconds <= 0 {
66-
return nrtcache.NewPassthrough(nodeTopologyLister), nil
56+
return nrtcache.NewPassthrough(client), nil
6757
}
6858

6959
podSharedInformer, podLister := nrtcache.InformerFromHandle(handle)
7060

71-
nrtCache, err := nrtcache.NewOverReserve(tcfg.Cache, nodeTopologyLister, podLister)
61+
nrtCache, err := nrtcache.NewOverReserve(tcfg.Cache, client, podLister)
7262
if err != nil {
7363
return nil, err
7464
}

pkg/noderesourcetopology/score.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (tm *TopologyMatch) Score(ctx context.Context, state *framework.CycleState,
6363
return framework.MaxNodeScore, nil
6464
}
6565

66-
nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(nodeName, pod)
66+
nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)
6767

6868
if !ok {
6969
klog.V(4).InfoS("noderesourcetopology is not valid for node", "node", nodeName)

0 commit comments

Comments
 (0)