diff --git a/cloud/linode/instances_test.go b/cloud/linode/instances_test.go index a1cd328e..cc88d8c4 100644 --- a/cloud/linode/instances_test.go +++ b/cloud/linode/instances_test.go @@ -17,7 +17,6 @@ import ( cloudprovider "k8s.io/cloud-provider" "github.com/linode/linode-cloud-controller-manager/cloud/linode/client/mocks" - ) const ( diff --git a/cloud/linode/node_controller.go b/cloud/linode/node_controller.go index 62f74edb..c817ef37 100644 --- a/cloud/linode/node_controller.go +++ b/cloud/linode/node_controller.go @@ -28,6 +28,11 @@ const ( defaultMetadataTTL = 300 * time.Second ) +type nodeRequest struct { + node *v1.Node + timestamp time.Time +} + type nodeController struct { sync.RWMutex @@ -39,7 +44,8 @@ type nodeController struct { metadataLastUpdate map[string]time.Time ttl time.Duration - queue workqueue.TypedDelayingInterface[any] + queue workqueue.TypedDelayingInterface[nodeRequest] + nodeLastAdded map[string]time.Time } func newNodeController(kubeclient kubernetes.Interface, client client.Client, informer v1informers.NodeInformer, instanceCache *instances) *nodeController { @@ -57,7 +63,8 @@ func newNodeController(kubeclient kubernetes.Interface, client client.Client, in informer: informer, ttl: timeout, metadataLastUpdate: make(map[string]time.Time), - queue: workqueue.NewTypedDelayingQueueWithConfig[any](workqueue.TypedDelayingQueueConfig[any]{Name: "ccm_node"}), + queue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "ccm_node"}), + nodeLastAdded: make(map[string]time.Time), } } @@ -71,7 +78,7 @@ func (s *nodeController) Run(stopCh <-chan struct{}) { } klog.Infof("NodeController will handle newly created node (%s) metadata", node.Name) - s.queue.Add(node) + s.addNodeToQueue(node) }, UpdateFunc: func(oldObj, newObj interface{}) { node, ok := newObj.(*v1.Node) @@ -80,7 +87,7 @@ func (s *nodeController) Run(stopCh <-chan struct{}) { } klog.Infof("NodeController will handle newly updated node (%s) metadata", node.Name) - s.queue.Add(node) + s.addNodeToQueue(node) }, }, informerResyncPeriod, @@ -92,6 +99,15 @@ func (s *nodeController) Run(stopCh <-chan struct{}) { s.informer.Informer().Run(stopCh) } +// addNodeToQueue adds a node to the queue for processing. +func (s *nodeController) addNodeToQueue(node *v1.Node) { + s.Lock() + defer s.Unlock() + currTime := time.Now() + s.nodeLastAdded[node.Name] = currTime + s.queue.Add(nodeRequest{node: node, timestamp: currTime}) +} + // worker runs a worker thread that dequeues new or modified nodes and processes // metadata (host UUID) on each of them. func (s *nodeController) worker() { @@ -100,31 +116,32 @@ func (s *nodeController) worker() { } func (s *nodeController) processNext() bool { - key, quit := s.queue.Get() + request, quit := s.queue.Get() if quit { return false } - defer s.queue.Done(key) + defer s.queue.Done(request) - node, ok := key.(*v1.Node) - if !ok { - klog.Errorf("expected dequeued key to be of type *v1.Node but got %T", node) + s.RLock() + latestTimestamp, exists := s.nodeLastAdded[request.node.Name] + s.RUnlock() + if !exists || request.timestamp.Before(latestTimestamp) { + klog.V(3).InfoS("Skipping node metadata update as its not the most recent object", "node", klog.KObj(request.node)) return true } - - err := s.handleNode(context.TODO(), node) + err := s.handleNode(context.TODO(), request.node) switch deleteErr := err.(type) { case nil: break case *linodego.Error: if deleteErr.Code >= http.StatusInternalServerError || deleteErr.Code == http.StatusTooManyRequests { - klog.Errorf("failed to add metadata for node (%s); retrying in 1 minute: %s", node.Name, err) - s.queue.AddAfter(node, retryInterval) + klog.Errorf("failed to add metadata for node (%s); retrying in 1 minute: %s", request.node.Name, err) + s.queue.AddAfter(request, retryInterval) } default: - klog.Errorf("failed to add metadata for node (%s); will not retry: %s", node.Name, err) + klog.Errorf("failed to add metadata for node (%s); will not retry: %s", request.node.Name, err) } return true } diff --git a/cloud/linode/node_controller_test.go b/cloud/linode/node_controller_test.go index 65e093a5..05619bdb 100644 --- a/cloud/linode/node_controller_test.go +++ b/cloud/linode/node_controller_test.go @@ -28,7 +28,7 @@ func TestNodeController_Run(t *testing.T) { client := mocks.NewMockClient(ctrl) kubeClient := fake.NewSimpleClientset() informer := informers.NewSharedInformerFactory(kubeClient, 0).Core().V1().Nodes() - mockQueue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "test"}) + mockQueue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "test"}) nodeCtrl := newNodeController(kubeClient, client, informer, newInstances(client)) nodeCtrl.queue = mockQueue @@ -68,7 +68,7 @@ func TestNodeController_processNext(t *testing.T) { defer ctrl.Finish() client := mocks.NewMockClient(ctrl) kubeClient := fake.NewSimpleClientset() - queue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "testQueue"}) + queue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "testQueue"}) node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "test", @@ -87,10 +87,11 @@ func TestNodeController_processNext(t *testing.T) { queue: queue, metadataLastUpdate: make(map[string]time.Time), ttl: defaultMetadataTTL, + nodeLastAdded: make(map[string]time.Time), } t.Run("should return no error on unknown errors", func(t *testing.T) { - queue.Add(node) + controller.addNodeToQueue(node) client.EXPECT().ListInstances(gomock.Any(), nil).Times(1).Return([]linodego.Instance{}, errors.New("lookup failed")) result := controller.processNext() assert.True(t, result, "processNext should return true") @@ -99,13 +100,9 @@ func TestNodeController_processNext(t *testing.T) { } }) - t.Run("should return no error if node exists", func(t *testing.T) { - queue.Add(node) - publicIP := net.ParseIP("172.234.31.123") - privateIP := net.ParseIP("192.168.159.135") - client.EXPECT().ListInstances(gomock.Any(), nil).Times(1).Return([]linodego.Instance{ - {ID: 111, Label: "test", IPv4: []*net.IP{&publicIP, &privateIP}, HostUUID: "111"}, - }, nil) + t.Run("should return no error if timestamp for node being processed is older than the most recent request", func(t *testing.T) { + controller.addNodeToQueue(node) + controller.nodeLastAdded["test"] = time.Now().Add(controller.ttl) result := controller.processNext() assert.True(t, result, "processNext should return true") if queue.Len() != 0 { @@ -113,8 +110,13 @@ func TestNodeController_processNext(t *testing.T) { } }) - t.Run("should return no error if queued object is not of type Node", func(t *testing.T) { - queue.Add("abc") + t.Run("should return no error if node exists", func(t *testing.T) { + controller.addNodeToQueue(node) + publicIP := net.ParseIP("172.234.31.123") + privateIP := net.ParseIP("192.168.159.135") + client.EXPECT().ListInstances(gomock.Any(), nil).Times(1).Return([]linodego.Instance{ + {ID: 111, Label: "test", IPv4: []*net.IP{&publicIP, &privateIP}, HostUUID: "111"}, + }, nil) result := controller.processNext() assert.True(t, result, "processNext should return true") if queue.Len() != 0 { @@ -123,7 +125,7 @@ func TestNodeController_processNext(t *testing.T) { }) t.Run("should return no error if node in k8s doesn't exist", func(t *testing.T) { - queue.Add(node) + controller.addNodeToQueue(node) controller.kubeclient = fake.NewSimpleClientset() defer func() { controller.kubeclient = kubeClient }() result := controller.processNext() @@ -134,9 +136,9 @@ func TestNodeController_processNext(t *testing.T) { }) t.Run("should return error and requeue when it gets 429 from linode API", func(t *testing.T) { - queue = workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "testQueue1"}) - queue.Add(node) + queue = workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "testQueue1"}) controller.queue = queue + controller.addNodeToQueue(node) client := mocks.NewMockClient(ctrl) controller.instances = newInstances(client) retryInterval = 1 * time.Nanosecond @@ -150,9 +152,9 @@ func TestNodeController_processNext(t *testing.T) { }) t.Run("should return error and requeue when it gets error >= 500 from linode API", func(t *testing.T) { - queue = workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "testQueue2"}) - queue.Add(node) + queue = workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "testQueue2"}) controller.queue = queue + controller.addNodeToQueue(node) client := mocks.NewMockClient(ctrl) controller.instances = newInstances(client) retryInterval = 1 * time.Nanosecond