Skip to content

Commit 91d3d5a

Browse files
authored
Merge pull request #1653 from marquiz/devel/master-multiple-k8sclients
nfd-master: use separate k8s api clients for each updater
2 parents 624c02e + 8ad6210 commit 91d3d5a

File tree

3 files changed

+45
-33
lines changed

3 files changed

+45
-33
lines changed

pkg/nfd-master/nfd-master-internal_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func TestUpdateNodeObject(t *testing.T) {
167167
fakeMaster := newFakeMaster(WithKubernetesClient(fakeCli))
168168

169169
Convey("When I successfully update the node with feature labels", func() {
170-
err := fakeMaster.updateNodeObject(testNode, featureLabels, featureAnnotations, featureExtResources, nil)
170+
err := fakeMaster.updateNodeObject(fakeCli, testNode, featureLabels, featureAnnotations, featureExtResources, nil)
171171
Convey("Error is nil", func() {
172172
So(err, ShouldBeNil)
173173
})
@@ -199,7 +199,7 @@ func TestUpdateNodeObject(t *testing.T) {
199199
fakeCli.CoreV1().(*fakecorev1client.FakeCoreV1).PrependReactor("patch", "nodes", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
200200
return true, &v1.Node{}, errors.New("Fake error when patching node")
201201
})
202-
err := fakeMaster.updateNodeObject(testNode, nil, featureAnnotations, ExtendedResources{"": ""}, nil)
202+
err := fakeMaster.updateNodeObject(fakeCli, testNode, nil, featureAnnotations, ExtendedResources{"": ""}, nil)
203203

204204
Convey("Error is produced", func() {
205205
So(err, ShouldBeError)

pkg/nfd-master/nfd-master.go

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
k8sLabels "k8s.io/apimachinery/pkg/labels"
4545
"k8s.io/apimachinery/pkg/types"
4646
k8sclient "k8s.io/client-go/kubernetes"
47+
restclient "k8s.io/client-go/rest"
4748
"k8s.io/client-go/tools/leaderelection"
4849
"k8s.io/client-go/tools/leaderelection/resourcelock"
4950
"k8s.io/klog/v2"
@@ -150,6 +151,7 @@ type nfdMaster struct {
150151
healthServer *grpc.Server
151152
stop chan struct{}
152153
ready chan struct{}
154+
kubeconfig *restclient.Config
153155
k8sClient k8sclient.Interface
154156
nodeUpdaterPool *nodeUpdaterPool
155157
deniedNs
@@ -200,6 +202,7 @@ func NewNfdMaster(opts ...NfdMasterOption) (NfdMaster, error) {
200202
if err != nil {
201203
return nfd, err
202204
}
205+
nfd.kubeconfig = kubeconfig
203206
cli, err := k8sclient.NewForConfig(kubeconfig)
204207
if err != nil {
205208
return nfd, err
@@ -528,7 +531,7 @@ func (m *nfdMaster) prune() error {
528531
return nil
529532
}
530533

531-
nodes, err := m.getNodes()
534+
nodes, err := getNodes(m.k8sClient)
532535
if err != nil {
533536
return err
534537
}
@@ -537,14 +540,14 @@ func (m *nfdMaster) prune() error {
537540
klog.InfoS("pruning node...", "nodeName", node.Name)
538541

539542
// Prune labels and extended resources
540-
err := m.updateNodeObject(&node, Labels{}, Annotations{}, ExtendedResources{}, []corev1.Taint{})
543+
err := m.updateNodeObject(m.k8sClient, &node, Labels{}, Annotations{}, ExtendedResources{}, []corev1.Taint{})
541544
if err != nil {
542545
nodeUpdateFailures.Inc()
543546
return fmt.Errorf("failed to prune node %q: %v", node.Name, err)
544547
}
545548

546549
// Prune annotations
547-
node, err := m.getNode(node.Name)
550+
node, err := getNode(m.k8sClient, node.Name)
548551
if err != nil {
549552
return err
550553
}
@@ -564,7 +567,7 @@ func (m *nfdMaster) prune() error {
564567
// "nfd.node.kubernetes.io/master.version" annotation, if it exists.
565568
// TODO: Drop when nfdv1alpha1.MasterVersionAnnotation is removed.
566569
func (m *nfdMaster) updateMasterNode() error {
567-
node, err := m.getNode(m.nodeName)
570+
node, err := getNode(m.k8sClient, m.nodeName)
568571
if err != nil {
569572
return err
570573
}
@@ -575,7 +578,7 @@ func (m *nfdMaster) updateMasterNode() error {
575578
nil,
576579
"/metadata/annotations")
577580

578-
err = m.patchNode(node.Name, p)
581+
err = patchNode(m.k8sClient, node.Name, p)
579582
if err != nil {
580583
return fmt.Errorf("failed to patch node annotations: %w", err)
581584
}
@@ -727,12 +730,12 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
727730
}
728731
if !m.config.NoPublish {
729732
// Fetch the node object.
730-
node, err := m.getNode(r.NodeName)
733+
node, err := getNode(m.k8sClient, r.NodeName)
731734
if err != nil {
732735
return &pb.SetLabelsReply{}, err
733736
}
734737
// Create labels et al
735-
if err := m.refreshNodeFeatures(node, r.GetLabels(), r.GetFeatures()); err != nil {
738+
if err := m.refreshNodeFeatures(m.k8sClient, node, r.GetLabels(), r.GetFeatures()); err != nil {
736739
nodeUpdateFailures.Inc()
737740
return &pb.SetLabelsReply{}, err
738741
}
@@ -743,7 +746,7 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
743746
func (m *nfdMaster) nfdAPIUpdateAllNodes() error {
744747
klog.InfoS("will process all nodes in the cluster")
745748

746-
nodes, err := m.getNodes()
749+
nodes, err := getNodes(m.k8sClient)
747750
if err != nil {
748751
return err
749752
}
@@ -755,7 +758,7 @@ func (m *nfdMaster) nfdAPIUpdateAllNodes() error {
755758
return nil
756759
}
757760

758-
func (m *nfdMaster) nfdAPIUpdateOneNode(node *corev1.Node) error {
761+
func (m *nfdMaster) nfdAPIUpdateOneNode(cli k8sclient.Interface, node *corev1.Node) error {
759762
if m.nfdController == nil || m.nfdController.featureLister == nil {
760763
return nil
761764
}
@@ -810,7 +813,7 @@ func (m *nfdMaster) nfdAPIUpdateOneNode(node *corev1.Node) error {
810813
// Update node labels et al. This may also mean removing all NFD-owned
811814
// labels (et al.), for example in the case no NodeFeature objects are
812815
// present.
813-
if err := m.refreshNodeFeatures(node, features.Labels, &features.Features); err != nil {
816+
if err := m.refreshNodeFeatures(cli, node, features.Labels, &features.Features); err != nil {
814817
return err
815818
}
816819

@@ -855,7 +858,7 @@ func filterExtendedResource(name, value string, features *nfdv1alpha1.Features)
855858
return filteredValue, nil
856859
}
857860

858-
func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]string, features *nfdv1alpha1.Features) error {
861+
func (m *nfdMaster) refreshNodeFeatures(cli k8sclient.Interface, node *corev1.Node, labels map[string]string, features *nfdv1alpha1.Features) error {
859862
if m.config.AutoDefaultNs {
860863
labels = addNsToMapKeys(labels, nfdv1alpha1.FeatureLabelNs)
861864
} else if labels == nil {
@@ -889,7 +892,7 @@ func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]str
889892
return nil
890893
}
891894

892-
err := m.updateNodeObject(node, labels, annotations, extendedResources, taints)
895+
err := m.updateNodeObject(cli, node, labels, annotations, extendedResources, taints)
893896
if err != nil {
894897
klog.ErrorS(err, "failed to update node", "nodeName", node.Name)
895898
return err
@@ -901,7 +904,7 @@ func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]str
901904
// setTaints sets node taints and annotations based on the taints passed via
902905
// nodeFeatureRule custom resorce. If empty list of taints is passed, currently
903906
// NFD owned taints and annotations are removed from the node.
904-
func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
907+
func setTaints(cli k8sclient.Interface, taints []corev1.Taint, node *corev1.Node) error {
905908
// De-serialize the taints annotation into corev1.Taint type for comparision below.
906909
var err error
907910
oldTaints := []corev1.Taint{}
@@ -940,7 +943,7 @@ func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
940943
}
941944

942945
if taintsUpdated {
943-
if err := controller.PatchNodeTaints(context.TODO(), m.k8sClient, node.Name, node, newNode); err != nil {
946+
if err := controller.PatchNodeTaints(context.TODO(), cli, node.Name, node, newNode); err != nil {
944947
return fmt.Errorf("failed to patch the node %v", node.Name)
945948
}
946949
klog.InfoS("updated node taints", "nodeName", node.Name)
@@ -960,7 +963,7 @@ func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
960963

961964
patches := createPatches([]string{nfdv1alpha1.NodeTaintsAnnotation}, node.Annotations, newAnnotations, "/metadata/annotations")
962965
if len(patches) > 0 {
963-
if err := m.patchNode(node.Name, patches); err != nil {
966+
if err := patchNode(cli, node.Name, patches); err != nil {
964967
return fmt.Errorf("error while patching node object: %w", err)
965968
}
966969
klog.V(1).InfoS("patched node annotations for taints", "nodeName", node.Name)
@@ -1057,7 +1060,7 @@ func (m *nfdMaster) processNodeFeatureRule(nodeName string, features *nfdv1alpha
10571060
// updateNodeObject ensures the Kubernetes node object is up to date,
10581061
// creating new labels and extended resources where necessary and removing
10591062
// outdated ones. Also updates the corresponding annotations.
1060-
func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAnnotations Annotations, extendedResources ExtendedResources, taints []corev1.Taint) error {
1063+
func (m *nfdMaster) updateNodeObject(cli k8sclient.Interface, node *corev1.Node, labels Labels, featureAnnotations Annotations, extendedResources ExtendedResources, taints []corev1.Taint) error {
10611064
annotations := make(Annotations)
10621065

10631066
// Store names of labels in an annotation
@@ -1110,13 +1113,13 @@ func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAn
11101113

11111114
// patch node status with extended resource changes
11121115
statusPatches := m.createExtendedResourcePatches(node, extendedResources)
1113-
err := m.patchNodeStatus(node.Name, statusPatches)
1116+
err := patchNodeStatus(cli, node.Name, statusPatches)
11141117
if err != nil {
11151118
return fmt.Errorf("error while patching extended resources: %w", err)
11161119
}
11171120

11181121
// Patch the node object in the apiserver
1119-
err = m.patchNode(node.Name, patches)
1122+
err = patchNode(cli, node.Name, patches)
11201123
if err != nil {
11211124
return fmt.Errorf("error while patching node object: %w", err)
11221125
}
@@ -1129,7 +1132,7 @@ func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAn
11291132
}
11301133

11311134
// Set taints
1132-
err = m.setTaints(taints, node)
1135+
err = setTaints(cli, taints, node)
11331136
if err != nil {
11341137
return err
11351138
}
@@ -1420,25 +1423,25 @@ func (m *nfdMaster) filterFeatureAnnotations(annotations map[string]string) map[
14201423
return outAnnotations
14211424
}
14221425

1423-
func (m *nfdMaster) getNode(nodeName string) (*corev1.Node, error) {
1424-
return m.k8sClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
1426+
func getNode(cli k8sclient.Interface, nodeName string) (*corev1.Node, error) {
1427+
return cli.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
14251428
}
14261429

1427-
func (m *nfdMaster) getNodes() (*corev1.NodeList, error) {
1428-
return m.k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
1430+
func getNodes(cli k8sclient.Interface) (*corev1.NodeList, error) {
1431+
return cli.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
14291432
}
14301433

1431-
func (m *nfdMaster) patchNode(nodeName string, patches []utils.JsonPatch, subresources ...string) error {
1434+
func patchNode(cli k8sclient.Interface, nodeName string, patches []utils.JsonPatch, subresources ...string) error {
14321435
if len(patches) == 0 {
14331436
return nil
14341437
}
14351438
data, err := json.Marshal(patches)
14361439
if err == nil {
1437-
_, err = m.k8sClient.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.JSONPatchType, data, metav1.PatchOptions{}, subresources...)
1440+
_, err = cli.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.JSONPatchType, data, metav1.PatchOptions{}, subresources...)
14381441
}
14391442
return err
14401443
}
14411444

1442-
func (m *nfdMaster) patchNodeStatus(nodeName string, patches []utils.JsonPatch) error {
1443-
return m.patchNode(nodeName, patches, "status")
1445+
func patchNodeStatus(cli k8sclient.Interface, nodeName string, patches []utils.JsonPatch) error {
1446+
return patchNode(cli, nodeName, patches, "status")
14441447
}

pkg/nfd-master/node-updater-pool.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"golang.org/x/time/rate"
2424
apierrors "k8s.io/apimachinery/pkg/api/errors"
25+
k8sclient "k8s.io/client-go/kubernetes"
2526
"k8s.io/client-go/util/workqueue"
2627
"k8s.io/klog/v2"
2728
)
@@ -41,7 +42,7 @@ func newNodeUpdaterPool(nfdMaster *nfdMaster) *nodeUpdaterPool {
4142
}
4243
}
4344

44-
func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingInterface) bool {
45+
func (u *nodeUpdaterPool) processNodeUpdateRequest(cli k8sclient.Interface, queue workqueue.RateLimitingInterface) bool {
4546
n, quit := queue.Get()
4647
if quit {
4748
return false
@@ -53,9 +54,9 @@ func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingI
5354
nodeUpdateRequests.Inc()
5455

5556
// Check if node exists
56-
if node, err := u.nfdMaster.getNode(nodeName); apierrors.IsNotFound(err) {
57+
if node, err := getNode(cli, nodeName); apierrors.IsNotFound(err) {
5758
klog.InfoS("node not found, skip update", "nodeName", nodeName)
58-
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(node); err != nil {
59+
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(cli, node); err != nil {
5960
if n := queue.NumRequeues(nodeName); n < 15 {
6061
klog.InfoS("retrying node update", "nodeName", nodeName, "lastError", err, "numRetries", n)
6162
} else {
@@ -71,7 +72,15 @@ func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingI
7172
}
7273

7374
func (u *nodeUpdaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) {
74-
for u.processNodeUpdateRequest(queue) {
75+
var cli k8sclient.Interface
76+
if u.nfdMaster.kubeconfig != nil {
77+
// For normal execution, initialize a separate api client for each updater
78+
cli = k8sclient.NewForConfigOrDie(u.nfdMaster.kubeconfig)
79+
} else {
80+
// For tests, re-use the api client from nfd-master
81+
cli = u.nfdMaster.k8sClient
82+
}
83+
for u.processNodeUpdateRequest(cli, queue) {
7584
}
7685
u.wg.Done()
7786
}

0 commit comments

Comments
 (0)