Skip to content

Commit 8ad6210

Browse files
committed
nfd-master: use separate k8s api clients for each updater
Sharing the same client between updater threads virtually serializes access, in practice making the effective parallelism close to 1. With this patch, in my bench cluster of 300 nodes, the time taken by updating all nodes drops from ~2 minutes to ~12 seconds (with the default parallelism of 10 node updater threads). This demonstrates the 10-fold increased parallelism from ~1 to 10. There might be other solutions that could be explored, e.g. caching nodes with an indexer/lister but otoh nfd doesn't necessarily need/want to watch every little change in each node. We only need to get the node when something in our own CRDs change (we don't react to any changes in the node object itself). Using multiple clients was the most obvious choice to solve the problem for now.
1 parent 31a56ac commit 8ad6210

File tree

3 files changed

+45
-33
lines changed

3 files changed

+45
-33
lines changed

pkg/nfd-master/nfd-master-internal_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func TestUpdateNodeObject(t *testing.T) {
167167
fakeMaster := newFakeMaster(WithKubernetesClient(fakeCli))
168168

169169
Convey("When I successfully update the node with feature labels", func() {
170-
err := fakeMaster.updateNodeObject(testNode, featureLabels, featureAnnotations, featureExtResources, nil)
170+
err := fakeMaster.updateNodeObject(fakeCli, testNode, featureLabels, featureAnnotations, featureExtResources, nil)
171171
Convey("Error is nil", func() {
172172
So(err, ShouldBeNil)
173173
})
@@ -199,7 +199,7 @@ func TestUpdateNodeObject(t *testing.T) {
199199
fakeCli.CoreV1().(*fakecorev1client.FakeCoreV1).PrependReactor("patch", "nodes", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
200200
return true, &v1.Node{}, errors.New("Fake error when patching node")
201201
})
202-
err := fakeMaster.updateNodeObject(testNode, nil, featureAnnotations, ExtendedResources{"": ""}, nil)
202+
err := fakeMaster.updateNodeObject(fakeCli, testNode, nil, featureAnnotations, ExtendedResources{"": ""}, nil)
203203

204204
Convey("Error is produced", func() {
205205
So(err, ShouldBeError)

pkg/nfd-master/nfd-master.go

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
k8sLabels "k8s.io/apimachinery/pkg/labels"
4545
"k8s.io/apimachinery/pkg/types"
4646
k8sclient "k8s.io/client-go/kubernetes"
47+
restclient "k8s.io/client-go/rest"
4748
"k8s.io/client-go/tools/leaderelection"
4849
"k8s.io/client-go/tools/leaderelection/resourcelock"
4950
"k8s.io/klog/v2"
@@ -150,6 +151,7 @@ type nfdMaster struct {
150151
healthServer *grpc.Server
151152
stop chan struct{}
152153
ready chan struct{}
154+
kubeconfig *restclient.Config
153155
k8sClient k8sclient.Interface
154156
nodeUpdaterPool *nodeUpdaterPool
155157
deniedNs
@@ -200,6 +202,7 @@ func NewNfdMaster(opts ...NfdMasterOption) (NfdMaster, error) {
200202
if err != nil {
201203
return nfd, err
202204
}
205+
nfd.kubeconfig = kubeconfig
203206
cli, err := k8sclient.NewForConfig(kubeconfig)
204207
if err != nil {
205208
return nfd, err
@@ -528,7 +531,7 @@ func (m *nfdMaster) prune() error {
528531
return nil
529532
}
530533

531-
nodes, err := m.getNodes()
534+
nodes, err := getNodes(m.k8sClient)
532535
if err != nil {
533536
return err
534537
}
@@ -537,14 +540,14 @@ func (m *nfdMaster) prune() error {
537540
klog.InfoS("pruning node...", "nodeName", node.Name)
538541

539542
// Prune labels and extended resources
540-
err := m.updateNodeObject(&node, Labels{}, Annotations{}, ExtendedResources{}, []corev1.Taint{})
543+
err := m.updateNodeObject(m.k8sClient, &node, Labels{}, Annotations{}, ExtendedResources{}, []corev1.Taint{})
541544
if err != nil {
542545
nodeUpdateFailures.Inc()
543546
return fmt.Errorf("failed to prune node %q: %v", node.Name, err)
544547
}
545548

546549
// Prune annotations
547-
node, err := m.getNode(node.Name)
550+
node, err := getNode(m.k8sClient, node.Name)
548551
if err != nil {
549552
return err
550553
}
@@ -564,7 +567,7 @@ func (m *nfdMaster) prune() error {
564567
// "nfd.node.kubernetes.io/master.version" annotation, if it exists.
565568
// TODO: Drop when nfdv1alpha1.MasterVersionAnnotation is removed.
566569
func (m *nfdMaster) updateMasterNode() error {
567-
node, err := m.getNode(m.nodeName)
570+
node, err := getNode(m.k8sClient, m.nodeName)
568571
if err != nil {
569572
return err
570573
}
@@ -575,7 +578,7 @@ func (m *nfdMaster) updateMasterNode() error {
575578
nil,
576579
"/metadata/annotations")
577580

578-
err = m.patchNode(node.Name, p)
581+
err = patchNode(m.k8sClient, node.Name, p)
579582
if err != nil {
580583
return fmt.Errorf("failed to patch node annotations: %w", err)
581584
}
@@ -727,12 +730,12 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
727730
}
728731
if !m.config.NoPublish {
729732
// Fetch the node object.
730-
node, err := m.getNode(r.NodeName)
733+
node, err := getNode(m.k8sClient, r.NodeName)
731734
if err != nil {
732735
return &pb.SetLabelsReply{}, err
733736
}
734737
// Create labels et al
735-
if err := m.refreshNodeFeatures(node, r.GetLabels(), r.GetFeatures()); err != nil {
738+
if err := m.refreshNodeFeatures(m.k8sClient, node, r.GetLabels(), r.GetFeatures()); err != nil {
736739
nodeUpdateFailures.Inc()
737740
return &pb.SetLabelsReply{}, err
738741
}
@@ -743,7 +746,7 @@ func (m *nfdMaster) SetLabels(c context.Context, r *pb.SetLabelsRequest) (*pb.Se
743746
func (m *nfdMaster) nfdAPIUpdateAllNodes() error {
744747
klog.InfoS("will process all nodes in the cluster")
745748

746-
nodes, err := m.getNodes()
749+
nodes, err := getNodes(m.k8sClient)
747750
if err != nil {
748751
return err
749752
}
@@ -755,7 +758,7 @@ func (m *nfdMaster) nfdAPIUpdateAllNodes() error {
755758
return nil
756759
}
757760

758-
func (m *nfdMaster) nfdAPIUpdateOneNode(node *corev1.Node) error {
761+
func (m *nfdMaster) nfdAPIUpdateOneNode(cli k8sclient.Interface, node *corev1.Node) error {
759762
if m.nfdController == nil || m.nfdController.featureLister == nil {
760763
return nil
761764
}
@@ -810,7 +813,7 @@ func (m *nfdMaster) nfdAPIUpdateOneNode(node *corev1.Node) error {
810813
// Update node labels et al. This may also mean removing all NFD-owned
811814
// labels (et al.), for example in the case no NodeFeature objects are
812815
// present.
813-
if err := m.refreshNodeFeatures(node, features.Labels, &features.Features); err != nil {
816+
if err := m.refreshNodeFeatures(cli, node, features.Labels, &features.Features); err != nil {
814817
return err
815818
}
816819

@@ -855,7 +858,7 @@ func filterExtendedResource(name, value string, features *nfdv1alpha1.Features)
855858
return filteredValue, nil
856859
}
857860

858-
func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]string, features *nfdv1alpha1.Features) error {
861+
func (m *nfdMaster) refreshNodeFeatures(cli k8sclient.Interface, node *corev1.Node, labels map[string]string, features *nfdv1alpha1.Features) error {
859862
if m.config.AutoDefaultNs {
860863
labels = addNsToMapKeys(labels, nfdv1alpha1.FeatureLabelNs)
861864
} else if labels == nil {
@@ -889,7 +892,7 @@ func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]str
889892
return nil
890893
}
891894

892-
err := m.updateNodeObject(node, labels, annotations, extendedResources, taints)
895+
err := m.updateNodeObject(cli, node, labels, annotations, extendedResources, taints)
893896
if err != nil {
894897
klog.ErrorS(err, "failed to update node", "nodeName", node.Name)
895898
return err
@@ -901,7 +904,7 @@ func (m *nfdMaster) refreshNodeFeatures(node *corev1.Node, labels map[string]str
901904
// setTaints sets node taints and annotations based on the taints passed via
902905
// nodeFeatureRule custom resorce. If empty list of taints is passed, currently
903906
// NFD owned taints and annotations are removed from the node.
904-
func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
907+
func setTaints(cli k8sclient.Interface, taints []corev1.Taint, node *corev1.Node) error {
905908
// De-serialize the taints annotation into corev1.Taint type for comparision below.
906909
var err error
907910
oldTaints := []corev1.Taint{}
@@ -940,7 +943,7 @@ func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
940943
}
941944

942945
if taintsUpdated {
943-
if err := controller.PatchNodeTaints(context.TODO(), m.k8sClient, node.Name, node, newNode); err != nil {
946+
if err := controller.PatchNodeTaints(context.TODO(), cli, node.Name, node, newNode); err != nil {
944947
return fmt.Errorf("failed to patch the node %v", node.Name)
945948
}
946949
klog.InfoS("updated node taints", "nodeName", node.Name)
@@ -960,7 +963,7 @@ func (m *nfdMaster) setTaints(taints []corev1.Taint, node *corev1.Node) error {
960963

961964
patches := createPatches([]string{nfdv1alpha1.NodeTaintsAnnotation}, node.Annotations, newAnnotations, "/metadata/annotations")
962965
if len(patches) > 0 {
963-
if err := m.patchNode(node.Name, patches); err != nil {
966+
if err := patchNode(cli, node.Name, patches); err != nil {
964967
return fmt.Errorf("error while patching node object: %w", err)
965968
}
966969
klog.V(1).InfoS("patched node annotations for taints", "nodeName", node.Name)
@@ -1057,7 +1060,7 @@ func (m *nfdMaster) processNodeFeatureRule(nodeName string, features *nfdv1alpha
10571060
// updateNodeObject ensures the Kubernetes node object is up to date,
10581061
// creating new labels and extended resources where necessary and removing
10591062
// outdated ones. Also updates the corresponding annotations.
1060-
func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAnnotations Annotations, extendedResources ExtendedResources, taints []corev1.Taint) error {
1063+
func (m *nfdMaster) updateNodeObject(cli k8sclient.Interface, node *corev1.Node, labels Labels, featureAnnotations Annotations, extendedResources ExtendedResources, taints []corev1.Taint) error {
10611064
annotations := make(Annotations)
10621065

10631066
// Store names of labels in an annotation
@@ -1110,13 +1113,13 @@ func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAn
11101113

11111114
// patch node status with extended resource changes
11121115
statusPatches := m.createExtendedResourcePatches(node, extendedResources)
1113-
err := m.patchNodeStatus(node.Name, statusPatches)
1116+
err := patchNodeStatus(cli, node.Name, statusPatches)
11141117
if err != nil {
11151118
return fmt.Errorf("error while patching extended resources: %w", err)
11161119
}
11171120

11181121
// Patch the node object in the apiserver
1119-
err = m.patchNode(node.Name, patches)
1122+
err = patchNode(cli, node.Name, patches)
11201123
if err != nil {
11211124
return fmt.Errorf("error while patching node object: %w", err)
11221125
}
@@ -1129,7 +1132,7 @@ func (m *nfdMaster) updateNodeObject(node *corev1.Node, labels Labels, featureAn
11291132
}
11301133

11311134
// Set taints
1132-
err = m.setTaints(taints, node)
1135+
err = setTaints(cli, taints, node)
11331136
if err != nil {
11341137
return err
11351138
}
@@ -1420,25 +1423,25 @@ func (m *nfdMaster) filterFeatureAnnotations(annotations map[string]string) map[
14201423
return outAnnotations
14211424
}
14221425

1423-
func (m *nfdMaster) getNode(nodeName string) (*corev1.Node, error) {
1424-
return m.k8sClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
1426+
func getNode(cli k8sclient.Interface, nodeName string) (*corev1.Node, error) {
1427+
return cli.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
14251428
}
14261429

1427-
func (m *nfdMaster) getNodes() (*corev1.NodeList, error) {
1428-
return m.k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
1430+
func getNodes(cli k8sclient.Interface) (*corev1.NodeList, error) {
1431+
return cli.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
14291432
}
14301433

1431-
func (m *nfdMaster) patchNode(nodeName string, patches []utils.JsonPatch, subresources ...string) error {
1434+
func patchNode(cli k8sclient.Interface, nodeName string, patches []utils.JsonPatch, subresources ...string) error {
14321435
if len(patches) == 0 {
14331436
return nil
14341437
}
14351438
data, err := json.Marshal(patches)
14361439
if err == nil {
1437-
_, err = m.k8sClient.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.JSONPatchType, data, metav1.PatchOptions{}, subresources...)
1440+
_, err = cli.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.JSONPatchType, data, metav1.PatchOptions{}, subresources...)
14381441
}
14391442
return err
14401443
}
14411444

1442-
func (m *nfdMaster) patchNodeStatus(nodeName string, patches []utils.JsonPatch) error {
1443-
return m.patchNode(nodeName, patches, "status")
1445+
func patchNodeStatus(cli k8sclient.Interface, nodeName string, patches []utils.JsonPatch) error {
1446+
return patchNode(cli, nodeName, patches, "status")
14441447
}

pkg/nfd-master/node-updater-pool.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"golang.org/x/time/rate"
2424
apierrors "k8s.io/apimachinery/pkg/api/errors"
25+
k8sclient "k8s.io/client-go/kubernetes"
2526
"k8s.io/client-go/util/workqueue"
2627
"k8s.io/klog/v2"
2728
)
@@ -41,7 +42,7 @@ func newNodeUpdaterPool(nfdMaster *nfdMaster) *nodeUpdaterPool {
4142
}
4243
}
4344

44-
func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingInterface) bool {
45+
func (u *nodeUpdaterPool) processNodeUpdateRequest(cli k8sclient.Interface, queue workqueue.RateLimitingInterface) bool {
4546
n, quit := queue.Get()
4647
if quit {
4748
return false
@@ -53,9 +54,9 @@ func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingI
5354
nodeUpdateRequests.Inc()
5455

5556
// Check if node exists
56-
if node, err := u.nfdMaster.getNode(nodeName); apierrors.IsNotFound(err) {
57+
if node, err := getNode(cli, nodeName); apierrors.IsNotFound(err) {
5758
klog.InfoS("node not found, skip update", "nodeName", nodeName)
58-
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(node); err != nil {
59+
} else if err := u.nfdMaster.nfdAPIUpdateOneNode(cli, node); err != nil {
5960
if n := queue.NumRequeues(nodeName); n < 15 {
6061
klog.InfoS("retrying node update", "nodeName", nodeName, "lastError", err, "numRetries", n)
6162
} else {
@@ -71,7 +72,15 @@ func (u *nodeUpdaterPool) processNodeUpdateRequest(queue workqueue.RateLimitingI
7172
}
7273

7374
func (u *nodeUpdaterPool) runNodeUpdater(queue workqueue.RateLimitingInterface) {
74-
for u.processNodeUpdateRequest(queue) {
75+
var cli k8sclient.Interface
76+
if u.nfdMaster.kubeconfig != nil {
77+
// For normal execution, initialize a separate api client for each updater
78+
cli = k8sclient.NewForConfigOrDie(u.nfdMaster.kubeconfig)
79+
} else {
80+
// For tests, re-use the api client from nfd-master
81+
cli = u.nfdMaster.k8sClient
82+
}
83+
for u.processNodeUpdateRequest(cli, queue) {
7584
}
7685
u.wg.Done()
7786
}

0 commit comments

Comments
 (0)