Skip to content

Commit 7ec308c

Browse files
Rahul Sharmarahulait
authored andcommitted
use latest node object from workqueue when processing the queue
1 parent 3ead343 commit 7ec308c

File tree

1 file changed

+30
-8
lines changed

1 file changed

+30
-8
lines changed

cloud/linode/node_controller.go

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ const (
2828
defaultMetadataTTL = 300 * time.Second
2929
)
3030

31+
type nodeRequest struct {
32+
node *v1.Node
33+
timestamp time.Time
34+
}
35+
3136
type nodeController struct {
3237
sync.RWMutex
3338

@@ -40,6 +45,9 @@ type nodeController struct {
4045
ttl time.Duration
4146

4247
queue workqueue.TypedDelayingInterface[any]
48+
49+
nodeLastAdded map[string]time.Time
50+
lock sync.Mutex
4351
}
4452

4553
func newNodeController(kubeclient kubernetes.Interface, client client.Client, informer v1informers.NodeInformer, instanceCache *instances) *nodeController {
@@ -58,6 +66,7 @@ func newNodeController(kubeclient kubernetes.Interface, client client.Client, in
5866
ttl: timeout,
5967
metadataLastUpdate: make(map[string]time.Time),
6068
queue: workqueue.NewTypedDelayingQueueWithConfig[any](workqueue.TypedDelayingQueueConfig[any]{Name: "ccm_node"}),
69+
nodeLastAdded: make(map[string]time.Time),
6170
}
6271
}
6372

@@ -71,7 +80,10 @@ func (s *nodeController) Run(stopCh <-chan struct{}) {
7180
}
7281

7382
klog.Infof("NodeController will handle newly created node (%s) metadata", node.Name)
74-
s.queue.Add(node)
83+
s.lock.Lock()
84+
defer s.lock.Unlock()
85+
s.nodeLastAdded[node.Name] = time.Now()
86+
s.queue.Add(nodeRequest{node: node, timestamp: time.Now()})
7587
},
7688
UpdateFunc: func(oldObj, newObj interface{}) {
7789
node, ok := newObj.(*v1.Node)
@@ -80,7 +92,10 @@ func (s *nodeController) Run(stopCh <-chan struct{}) {
8092
}
8193

8294
klog.Infof("NodeController will handle newly updated node (%s) metadata", node.Name)
83-
s.queue.Add(node)
95+
s.lock.Lock()
96+
defer s.lock.Unlock()
97+
s.nodeLastAdded[node.Name] = time.Now()
98+
s.queue.Add(nodeRequest{node: node, timestamp: time.Now()})
8499
},
85100
},
86101
informerResyncPeriod,
@@ -106,25 +121,32 @@ func (s *nodeController) processNext() bool {
106121
}
107122
defer s.queue.Done(key)
108123

109-
node, ok := key.(*v1.Node)
124+
request, ok := key.(nodeRequest)
110125
if !ok {
111-
klog.Errorf("expected dequeued key to be of type *v1.Node but got %T", node)
126+
klog.Errorf("expected dequeued key to be of type nodeRequest but got %T", request)
112127
return true
113128
}
114129

115-
err := s.handleNode(context.TODO(), node)
130+
s.lock.Lock()
131+
latestTimestamp, exists := s.nodeLastAdded[request.node.Name]
132+
s.lock.Unlock()
133+
if !exists || request.timestamp.Before(latestTimestamp) {
134+
klog.V(3).InfoS("Skipping node metadata update as its not the most recent object", "node", klog.KObj(request.node))
135+
return true
136+
}
137+
err := s.handleNode(context.TODO(), request.node)
116138
switch deleteErr := err.(type) {
117139
case nil:
118140
break
119141

120142
case *linodego.Error:
121143
if deleteErr.Code >= http.StatusInternalServerError || deleteErr.Code == http.StatusTooManyRequests {
122-
klog.Errorf("failed to add metadata for node (%s); retrying in 1 minute: %s", node.Name, err)
123-
s.queue.AddAfter(node, retryInterval)
144+
klog.Errorf("failed to add metadata for node (%s); retrying in 1 minute: %s", request.node.Name, err)
145+
s.queue.AddAfter(request, retryInterval)
124146
}
125147

126148
default:
127-
klog.Errorf("failed to add metadata for node (%s); will not retry: %s", node.Name, err)
149+
klog.Errorf("failed to add metadata for node (%s); will not retry: %s", request.node.Name, err)
128150
}
129151
return true
130152
}

0 commit comments

Comments
 (0)