From 7ec308ce0bf578440fc6d0ea34768f7fc3d3dbc3 Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Mon, 3 Mar 2025 22:03:34 +0000 Subject: [PATCH 1/3] use latest node object from workqueue when processing the queue --- cloud/linode/node_controller.go | 38 ++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/cloud/linode/node_controller.go b/cloud/linode/node_controller.go index 62f74edb..16749dec 100644 --- a/cloud/linode/node_controller.go +++ b/cloud/linode/node_controller.go @@ -28,6 +28,11 @@ const ( defaultMetadataTTL = 300 * time.Second ) +type nodeRequest struct { + node *v1.Node + timestamp time.Time +} + type nodeController struct { sync.RWMutex @@ -40,6 +45,9 @@ type nodeController struct { ttl time.Duration queue workqueue.TypedDelayingInterface[any] + + nodeLastAdded map[string]time.Time + lock sync.Mutex } func newNodeController(kubeclient kubernetes.Interface, client client.Client, informer v1informers.NodeInformer, instanceCache *instances) *nodeController { @@ -58,6 +66,7 @@ func newNodeController(kubeclient kubernetes.Interface, client client.Client, in ttl: timeout, metadataLastUpdate: make(map[string]time.Time), queue: workqueue.NewTypedDelayingQueueWithConfig[any](workqueue.TypedDelayingQueueConfig[any]{Name: "ccm_node"}), + nodeLastAdded: make(map[string]time.Time), } } @@ -71,7 +80,10 @@ func (s *nodeController) Run(stopCh <-chan struct{}) { } klog.Infof("NodeController will handle newly created node (%s) metadata", node.Name) - s.queue.Add(node) + s.lock.Lock() + defer s.lock.Unlock() + s.nodeLastAdded[node.Name] = time.Now() + s.queue.Add(nodeRequest{node: node, timestamp: time.Now()}) }, UpdateFunc: func(oldObj, newObj interface{}) { node, ok := newObj.(*v1.Node) @@ -80,7 +92,10 @@ func (s *nodeController) Run(stopCh <-chan struct{}) { } klog.Infof("NodeController will handle newly updated node (%s) metadata", node.Name) - s.queue.Add(node) + s.lock.Lock() + defer s.lock.Unlock() + s.nodeLastAdded[node.Name] = time.Now() + s.queue.Add(nodeRequest{node: node, timestamp: time.Now()}) }, }, informerResyncPeriod, @@ -106,25 +121,32 @@ func (s *nodeController) processNext() bool { } defer s.queue.Done(key) - node, ok := key.(*v1.Node) + request, ok := key.(nodeRequest) if !ok { - klog.Errorf("expected dequeued key to be of type *v1.Node but got %T", node) + klog.Errorf("expected dequeued key to be of type nodeRequest but got %T", request) return true } - err := s.handleNode(context.TODO(), node) + s.lock.Lock() + latestTimestamp, exists := s.nodeLastAdded[request.node.Name] + s.lock.Unlock() + if !exists || request.timestamp.Before(latestTimestamp) { + klog.V(3).InfoS("Skipping node metadata update as its not the most recent object", "node", klog.KObj(request.node)) + return true + } + err := s.handleNode(context.TODO(), request.node) switch deleteErr := err.(type) { case nil: break case *linodego.Error: if deleteErr.Code >= http.StatusInternalServerError || deleteErr.Code == http.StatusTooManyRequests { - klog.Errorf("failed to add metadata for node (%s); retrying in 1 minute: %s", node.Name, err) - s.queue.AddAfter(node, retryInterval) + klog.Errorf("failed to add metadata for node (%s); retrying in 1 minute: %s", request.node.Name, err) + s.queue.AddAfter(request, retryInterval) } default: - klog.Errorf("failed to add metadata for node (%s); will not retry: %s", node.Name, err) + klog.Errorf("failed to add metadata for node (%s); will not retry: %s", request.node.Name, err) } return true } From 1912d419dd5cfee52423d02fa9f5a04d87a7e9b2 Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Sun, 9 Mar 2025 01:39:17 +0000 Subject: [PATCH 2/3] fix unittests --- cloud/linode/instances_test.go | 1 - cloud/linode/node_controller.go | 38 ++++++++++++---------------- cloud/linode/node_controller_test.go | 28 ++++++++------------ 3 files changed, 26 insertions(+), 41 deletions(-) diff --git a/cloud/linode/instances_test.go b/cloud/linode/instances_test.go index a1cd328e..cc88d8c4 100644 --- a/cloud/linode/instances_test.go +++ b/cloud/linode/instances_test.go @@ -17,7 +17,6 @@ import ( cloudprovider "k8s.io/cloud-provider" "github.com/linode/linode-cloud-controller-manager/cloud/linode/client/mocks" - ) const ( diff --git a/cloud/linode/node_controller.go b/cloud/linode/node_controller.go index 16749dec..08ad8b94 100644 --- a/cloud/linode/node_controller.go +++ b/cloud/linode/node_controller.go @@ -44,10 +44,8 @@ type nodeController struct { metadataLastUpdate map[string]time.Time ttl time.Duration - queue workqueue.TypedDelayingInterface[any] - + queue workqueue.TypedDelayingInterface[nodeRequest] nodeLastAdded map[string]time.Time - lock sync.Mutex } func newNodeController(kubeclient kubernetes.Interface, client client.Client, informer v1informers.NodeInformer, instanceCache *instances) *nodeController { @@ -65,7 +63,7 @@ func newNodeController(kubeclient kubernetes.Interface, client client.Client, in informer: informer, ttl: timeout, metadataLastUpdate: make(map[string]time.Time), - queue: workqueue.NewTypedDelayingQueueWithConfig[any](workqueue.TypedDelayingQueueConfig[any]{Name: "ccm_node"}), + queue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "ccm_node"}), nodeLastAdded: make(map[string]time.Time), } } @@ -80,10 +78,7 @@ func (s *nodeController) Run(stopCh <-chan struct{}) { } klog.Infof("NodeController will handle newly created node (%s) metadata", node.Name) - s.lock.Lock() - defer s.lock.Unlock() - s.nodeLastAdded[node.Name] = time.Now() - s.queue.Add(nodeRequest{node: node, timestamp: time.Now()}) + s.addNodeToQueue(node) }, UpdateFunc: func(oldObj, newObj interface{}) { node, ok := newObj.(*v1.Node) @@ -92,10 +87,7 @@ func (s *nodeController) Run(stopCh <-chan struct{}) { } klog.Infof("NodeController will handle newly updated node (%s) metadata", node.Name) - s.lock.Lock() - defer s.lock.Unlock() - s.nodeLastAdded[node.Name] = time.Now() - s.queue.Add(nodeRequest{node: node, timestamp: time.Now()}) + s.addNodeToQueue(node) }, }, informerResyncPeriod, @@ -107,6 +99,14 @@ func (s *nodeController) Run(stopCh <-chan struct{}) { s.informer.Informer().Run(stopCh) } +// addNodeToQueue adds a node to the queue for processing. +func (s *nodeController) addNodeToQueue(node *v1.Node) { + s.Lock() + defer s.Unlock() + s.nodeLastAdded[node.Name] = time.Now() + s.queue.Add(nodeRequest{node: node, timestamp: time.Now()}) +} + // worker runs a worker thread that dequeues new or modified nodes and processes // metadata (host UUID) on each of them. func (s *nodeController) worker() { @@ -115,21 +115,15 @@ func (s *nodeController) worker() { } func (s *nodeController) processNext() bool { - key, quit := s.queue.Get() + request, quit := s.queue.Get() if quit { return false } - defer s.queue.Done(key) + defer s.queue.Done(request) - request, ok := key.(nodeRequest) - if !ok { - klog.Errorf("expected dequeued key to be of type nodeRequest but got %T", request) - return true - } - - s.lock.Lock() + s.RLock() latestTimestamp, exists := s.nodeLastAdded[request.node.Name] - s.lock.Unlock() + s.RUnlock() if !exists || request.timestamp.Before(latestTimestamp) { klog.V(3).InfoS("Skipping node metadata update as its not the most recent object", "node", klog.KObj(request.node)) return true diff --git a/cloud/linode/node_controller_test.go b/cloud/linode/node_controller_test.go index 65e093a5..80fa8bbf 100644 --- a/cloud/linode/node_controller_test.go +++ b/cloud/linode/node_controller_test.go @@ -28,7 +28,7 @@ func TestNodeController_Run(t *testing.T) { client := mocks.NewMockClient(ctrl) kubeClient := fake.NewSimpleClientset() informer := informers.NewSharedInformerFactory(kubeClient, 0).Core().V1().Nodes() - mockQueue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "test"}) + mockQueue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "test"}) nodeCtrl := newNodeController(kubeClient, client, informer, newInstances(client)) nodeCtrl.queue = mockQueue @@ -68,7 +68,7 @@ func TestNodeController_processNext(t *testing.T) { defer ctrl.Finish() client := mocks.NewMockClient(ctrl) kubeClient := fake.NewSimpleClientset() - queue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "testQueue"}) + queue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "testQueue"}) node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "test", @@ -87,10 +87,11 @@ func TestNodeController_processNext(t *testing.T) { queue: queue, metadataLastUpdate: make(map[string]time.Time), ttl: defaultMetadataTTL, + nodeLastAdded: make(map[string]time.Time), } t.Run("should return no error on unknown errors", func(t *testing.T) { - queue.Add(node) + controller.addNodeToQueue(node) client.EXPECT().ListInstances(gomock.Any(), nil).Times(1).Return([]linodego.Instance{}, errors.New("lookup failed")) result := controller.processNext() assert.True(t, result, "processNext should return true") @@ -100,7 +101,7 @@ func TestNodeController_processNext(t *testing.T) { }) t.Run("should return no error if node exists", func(t *testing.T) { - queue.Add(node) + controller.addNodeToQueue(node) publicIP := net.ParseIP("172.234.31.123") privateIP := net.ParseIP("192.168.159.135") client.EXPECT().ListInstances(gomock.Any(), nil).Times(1).Return([]linodego.Instance{ @@ -113,17 +114,8 @@ func TestNodeController_processNext(t *testing.T) { } }) - t.Run("should return no error if queued object is not of type Node", func(t *testing.T) { - queue.Add("abc") - result := controller.processNext() - assert.True(t, result, "processNext should return true") - if queue.Len() != 0 { - t.Errorf("expected queue to be empty, got %d items", queue.Len()) - } - }) - t.Run("should return no error if node in k8s doesn't exist", func(t *testing.T) { - queue.Add(node) + controller.addNodeToQueue(node) controller.kubeclient = fake.NewSimpleClientset() defer func() { controller.kubeclient = kubeClient }() result := controller.processNext() @@ -134,9 +126,9 @@ func TestNodeController_processNext(t *testing.T) { }) t.Run("should return error and requeue when it gets 429 from linode API", func(t *testing.T) { - queue = workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "testQueue1"}) - queue.Add(node) + queue = workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "testQueue1"}) controller.queue = queue + controller.addNodeToQueue(node) client := mocks.NewMockClient(ctrl) controller.instances = newInstances(client) retryInterval = 1 * time.Nanosecond @@ -150,9 +142,9 @@ func TestNodeController_processNext(t *testing.T) { }) t.Run("should return error and requeue when it gets error >= 500 from linode API", func(t *testing.T) { - queue = workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "testQueue2"}) - queue.Add(node) + queue = workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "testQueue2"}) controller.queue = queue + controller.addNodeToQueue(node) client := mocks.NewMockClient(ctrl) controller.instances = newInstances(client) retryInterval = 1 * time.Nanosecond From e462d7b82b7d19829ddeaa437b35a8de7d2357e1 Mon Sep 17 00:00:00 2001 From: Rahul Sharma Date: Sun, 9 Mar 2025 02:28:04 +0000 Subject: [PATCH 3/3] add unittest to test processing older node requests --- cloud/linode/node_controller.go | 5 +++-- cloud/linode/node_controller_test.go | 10 ++++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/cloud/linode/node_controller.go b/cloud/linode/node_controller.go index 08ad8b94..c817ef37 100644 --- a/cloud/linode/node_controller.go +++ b/cloud/linode/node_controller.go @@ -103,8 +103,9 @@ func (s *nodeController) Run(stopCh <-chan struct{}) { func (s *nodeController) addNodeToQueue(node *v1.Node) { s.Lock() defer s.Unlock() - s.nodeLastAdded[node.Name] = time.Now() - s.queue.Add(nodeRequest{node: node, timestamp: time.Now()}) + currTime := time.Now() + s.nodeLastAdded[node.Name] = currTime + s.queue.Add(nodeRequest{node: node, timestamp: currTime}) } // worker runs a worker thread that dequeues new or modified nodes and processes diff --git a/cloud/linode/node_controller_test.go b/cloud/linode/node_controller_test.go index 80fa8bbf..05619bdb 100644 --- a/cloud/linode/node_controller_test.go +++ b/cloud/linode/node_controller_test.go @@ -100,6 +100,16 @@ func TestNodeController_processNext(t *testing.T) { } }) + t.Run("should return no error if timestamp for node being processed is older than the most recent request", func(t *testing.T) { + controller.addNodeToQueue(node) + controller.nodeLastAdded["test"] = time.Now().Add(controller.ttl) + result := controller.processNext() + assert.True(t, result, "processNext should return true") + if queue.Len() != 0 { + t.Errorf("expected queue to be empty, got %d items", queue.Len()) + } + }) + t.Run("should return no error if node exists", func(t *testing.T) { controller.addNodeToQueue(node) publicIP := net.ParseIP("172.234.31.123")