Skip to content

Commit 4ba09a8

Browse files
committed
cloud node controller: implement with workqueues and node lister
1 parent 3d52b8b commit 4ba09a8

File tree

3 files changed

+178
-90
lines changed

3 files changed

+178
-90
lines changed

staging/src/k8s.io/cloud-provider/controllers/node/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ go_library(
1717
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
1818
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
1919
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
20+
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
2021
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
2122
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
2223
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
24+
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
2325
"//staging/src/k8s.io/cloud-provider:go_default_library",
2426
"//staging/src/k8s.io/cloud-provider/api:go_default_library",
2527
"//staging/src/k8s.io/cloud-provider/node/helpers:go_default_library",

staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go

Lines changed: 156 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"fmt"
2323
"time"
2424

25-
"k8s.io/api/core/v1"
25+
v1 "k8s.io/api/core/v1"
2626
apierrors "k8s.io/apimachinery/pkg/api/errors"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/types"
@@ -32,9 +32,11 @@ import (
3232
clientset "k8s.io/client-go/kubernetes"
3333
"k8s.io/client-go/kubernetes/scheme"
3434
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
35+
corelisters "k8s.io/client-go/listers/core/v1"
3536
"k8s.io/client-go/tools/cache"
3637
"k8s.io/client-go/tools/record"
3738
clientretry "k8s.io/client-go/util/retry"
39+
"k8s.io/client-go/util/workqueue"
3840
cloudprovider "k8s.io/cloud-provider"
3941
cloudproviderapi "k8s.io/cloud-provider/api"
4042
cloudnodeutil "k8s.io/cloud-provider/node/helpers"
@@ -84,6 +86,7 @@ var UpdateNodeSpecBackoff = wait.Backoff{
8486
Jitter: 1.0,
8587
}
8688

89+
// CloudNodeController is the controller implementation for Node resources
8790
type CloudNodeController struct {
8891
nodeInformer coreinformers.NodeInformer
8992
kubeClient clientset.Interface
@@ -92,6 +95,10 @@ type CloudNodeController struct {
9295
cloud cloudprovider.Interface
9396

9497
nodeStatusUpdateFrequency time.Duration
98+
99+
nodesLister corelisters.NodeLister
100+
nodesSynced cache.InformerSynced
101+
workqueue workqueue.RateLimitingInterface
95102
}
96103

97104
// NewCloudNodeController creates a CloudNodeController object
@@ -120,38 +127,112 @@ func NewCloudNodeController(
120127
recorder: recorder,
121128
cloud: cloud,
122129
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
130+
nodesLister: nodeInformer.Lister(),
131+
nodesSynced: nodeInformer.Informer().HasSynced,
132+
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Nodes"),
123133
}
124134

125135
// Use shared informer to listen to add/update of nodes. Note that any nodes
126136
// that exist before node controller starts will show up in the update method
127137
cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
128-
AddFunc: func(obj interface{}) { cnc.AddCloudNode(context.TODO(), obj) },
129-
UpdateFunc: func(oldObj, newObj interface{}) { cnc.UpdateCloudNode(context.TODO(), oldObj, newObj) },
138+
AddFunc: cnc.enqueueNode,
139+
UpdateFunc: func(oldObj, newObj interface{}) { cnc.enqueueNode(newObj) },
130140
})
131141

132142
return cnc, nil
133143
}
134144

145+
// Run will sync informer caches and starting workers.
135146
// This controller updates newly registered nodes with information
136147
// from the cloud provider. This call is blocking so should be called
137148
// via a goroutine
138149
func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) {
139150
defer utilruntime.HandleCrash()
151+
defer cnc.workqueue.ShutDown()
152+
153+
// Wait for the caches to be synced before starting workers
154+
klog.Info("Waiting for informer caches to sync")
155+
if ok := cache.WaitForCacheSync(stopCh, cnc.nodesSynced); !ok {
156+
klog.Errorf("failed to wait for caches to sync")
157+
return
158+
}
140159

141-
// The following loops run communicate with the APIServer with a worst case complexity
160+
// The periodic loop for updateNodeStatus communicates with the APIServer with a worst case complexity
142161
// of O(num_nodes) per cycle. These functions are justified here because these events fire
143162
// very infrequently. DO NOT MODIFY this to perform frequent operations.
163+
go wait.Until(func() { cnc.UpdateNodeStatus(context.TODO()) }, cnc.nodeStatusUpdateFrequency, stopCh)
164+
go wait.Until(cnc.runWorker, time.Second, stopCh)
165+
166+
<-stopCh
167+
}
168+
169+
// runWorker is a long-running function that will continually call the
170+
// processNextWorkItem function in order to read and process a message on the
171+
// workqueue.
172+
func (cnc *CloudNodeController) runWorker() {
173+
for cnc.processNextWorkItem() {
174+
}
175+
}
176+
177+
// processNextWorkItem will read a single work item off the workqueue and
178+
// attempt to process it, by calling the syncHandler.
179+
func (cnc *CloudNodeController) processNextWorkItem() bool {
180+
obj, shutdown := cnc.workqueue.Get()
181+
if shutdown {
182+
return false
183+
}
184+
185+
// We wrap this block in a func so we can defer cnc.workqueue.Done.
186+
err := func(obj interface{}) error {
187+
defer cnc.workqueue.Done(obj)
188+
189+
var key string
190+
var ok bool
191+
if key, ok = obj.(string); !ok {
192+
cnc.workqueue.Forget(obj)
193+
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
194+
return nil
195+
}
196+
197+
// Run the syncHandler, passing it the key of the
198+
// Node resource to be synced.
199+
if err := cnc.syncHandler(key); err != nil {
200+
// Put the item back on the workqueue to handle any transient errors.
201+
cnc.workqueue.AddRateLimited(key)
202+
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
203+
}
204+
205+
// Finally, if no error occurs we Forget this item so it does not
206+
// get queued again until another change happens.
207+
cnc.workqueue.Forget(obj)
208+
return nil
209+
}(obj)
210+
211+
if err != nil {
212+
utilruntime.HandleError(err)
213+
return true
214+
}
215+
216+
return true
217+
}
218+
219+
// syncHandler implements the logic of the controller.
220+
func (cnc *CloudNodeController) syncHandler(key string) error {
221+
_, name, err := cache.SplitMetaNamespaceKey(key)
222+
if err != nil {
223+
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
224+
return nil
225+
}
144226

145-
// Start a loop to periodically update the node addresses obtained from the cloud
146-
wait.Until(func() { cnc.UpdateNodeStatus(context.TODO()) }, cnc.nodeStatusUpdateFrequency, stopCh)
227+
return cnc.syncNode(context.TODO(), name)
147228
}
148229

149230
// UpdateNodeStatus updates the node status, such as node addresses
150-
func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) {
231+
func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) error {
151232
nodes, err := cnc.kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"})
152233
if err != nil {
153234
klog.Errorf("Error monitoring node status: %v", err)
154-
return
235+
return err
155236
}
156237

157238
for i := range nodes.Items {
@@ -169,6 +250,20 @@ func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) {
169250
klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err)
170251
}
171252
}
253+
254+
return nil
255+
}
256+
257+
// enqueueNode takes a Node resource and converts it into a key
258+
// string which is then put onto the work queue.
259+
func (cnc *CloudNodeController) enqueueNode(obj interface{}) {
260+
var key string
261+
var err error
262+
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
263+
utilruntime.HandleError(err)
264+
return
265+
}
266+
cnc.workqueue.Add(key)
172267
}
173268

174269
// reconcileNodeLabels reconciles node labels transitioning from beta to GA
@@ -273,122 +368,98 @@ func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1.
273368
// in a retry-if-conflict loop.
274369
type nodeModifier func(*v1.Node)
275370

276-
func (cnc *CloudNodeController) UpdateCloudNode(ctx context.Context, _, newObj interface{}) {
277-
node, ok := newObj.(*v1.Node)
278-
if !ok {
279-
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
280-
return
281-
}
282-
283-
cloudTaint := getCloudTaint(node.Spec.Taints)
284-
if cloudTaint == nil {
285-
// The node has already been initialized so nothing to do.
286-
return
287-
}
288-
289-
cnc.initializeNode(ctx, node)
290-
}
291-
292-
// AddCloudNode handles initializing new nodes registered with the cloud taint.
293-
func (cnc *CloudNodeController) AddCloudNode(ctx context.Context, obj interface{}) {
294-
node := obj.(*v1.Node)
295-
296-
cloudTaint := getCloudTaint(node.Spec.Taints)
297-
if cloudTaint == nil {
298-
klog.V(2).Infof("This node %s is registered without the cloud taint. Will not process.", node.Name)
299-
return
300-
}
301-
302-
cnc.initializeNode(ctx, node)
303-
}
304-
305-
// This processes nodes that were added into the cluster, and cloud initialize them if appropriate
306-
func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Node) {
307-
klog.Infof("Initializing node %s with cloud provider", node.Name)
308-
309-
err := clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error {
310-
// TODO(wlan0): Move this logic to the route controller using the node taint instead of condition
311-
// Since there are node taints, do we still need this?
312-
// This condition marks the node as unusable until routes are initialized in the cloud provider
313-
if cnc.cloud.ProviderName() == "gce" {
314-
if err := cloudnodeutil.SetNodeCondition(cnc.kubeClient, types.NodeName(node.Name), v1.NodeCondition{
315-
Type: v1.NodeNetworkUnavailable,
316-
Status: v1.ConditionTrue,
317-
Reason: "NoRouteCreated",
318-
Message: "Node created without a route",
319-
LastTransitionTime: metav1.Now(),
320-
}); err != nil {
321-
return err
322-
}
323-
}
324-
return nil
325-
})
371+
// syncNode handles updating existing nodes registered with the cloud taint
372+
// and processes nodes that were added into the cluster, and cloud initialize them if appropriate.
373+
func (cnc *CloudNodeController) syncNode(ctx context.Context, nodeName string) error {
374+
curNode, err := cnc.nodeInformer.Lister().Get(nodeName)
326375
if err != nil {
327-
utilruntime.HandleError(err)
328-
return
329-
}
376+
if apierrors.IsNotFound(err) {
377+
return nil
378+
}
330379

331-
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
332-
if err != nil {
333-
utilruntime.HandleError(fmt.Errorf("failed to get node %s: %v", node.Name, err))
334-
return
380+
return err
335381
}
336382

337383
cloudTaint := getCloudTaint(curNode.Spec.Taints)
338384
if cloudTaint == nil {
339385
// Node object received from event had the cloud taint but was outdated,
340-
// the node has actually already been initialized.
341-
return
386+
// the node has actually already been initialized, so this sync event can be ignored.
387+
return nil
342388
}
343389

344-
providerID, err := cnc.getProviderID(ctx, curNode)
390+
klog.Infof("Initializing node %s with cloud provider", nodeName)
391+
392+
copyNode := curNode.DeepCopy()
393+
providerID, err := cnc.getProviderID(ctx, copyNode)
345394
if err != nil {
346-
utilruntime.HandleError(fmt.Errorf("failed to get provider ID for node %s at cloudprovider: %v", node.Name, err))
347-
return
395+
return fmt.Errorf("failed to get provider ID for node %s at cloudprovider: %v", nodeName, err)
348396
}
349397

350-
instanceMetadata, err := cnc.getInstanceMetadata(ctx, providerID, curNode)
398+
instanceMetadata, err := cnc.getInstanceMetadata(ctx, providerID, copyNode)
351399
if err != nil {
352-
utilruntime.HandleError(fmt.Errorf("failed to get instance metadata for node %s: %v", node.Name, err))
353-
return
400+
return fmt.Errorf("failed to get instance metadata for node %s: %v", nodeName, err)
354401
}
355402

356-
nodeModifiers, err := cnc.getNodeModifiersFromCloudProvider(ctx, providerID, curNode, instanceMetadata)
403+
nodeModifiers, err := cnc.getNodeModifiersFromCloudProvider(ctx, providerID, copyNode, instanceMetadata)
357404
if err != nil {
358-
utilruntime.HandleError(fmt.Errorf("failed to initialize node %s at cloudprovider: %v", node.Name, err))
359-
return
405+
return fmt.Errorf("failed to get node modifiers from cloud provider: %v", err)
360406
}
361407

362408
nodeModifiers = append(nodeModifiers, func(n *v1.Node) {
363409
n.Spec.Taints = excludeCloudTaint(n.Spec.Taints)
364410
})
365411

366412
err = clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error {
367-
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
368-
if err != nil {
369-
return err
413+
var curNode *v1.Node
414+
if cnc.cloud.ProviderName() == "gce" {
415+
// TODO(wlan0): Move this logic to the route controller using the node taint instead of condition
416+
// Since there are node taints, do we still need this?
417+
// This condition marks the node as unusable until routes are initialized in the cloud provider
418+
if err := cloudnodeutil.SetNodeCondition(cnc.kubeClient, types.NodeName(nodeName), v1.NodeCondition{
419+
Type: v1.NodeNetworkUnavailable,
420+
Status: v1.ConditionTrue,
421+
Reason: "NoRouteCreated",
422+
Message: "Node created without a route",
423+
LastTransitionTime: metav1.Now(),
424+
}); err != nil {
425+
return err
426+
}
427+
428+
// fetch latest node from API server since GCE-specific condition was set and informer cache may be stale
429+
curNode, err = cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
430+
if err != nil {
431+
return err
432+
}
433+
} else {
434+
curNode, err = cnc.nodeInformer.Lister().Get(nodeName)
435+
if err != nil {
436+
return err
437+
}
370438
}
371439

440+
newNode := curNode.DeepCopy()
372441
for _, modify := range nodeModifiers {
373-
modify(curNode)
442+
modify(newNode)
374443
}
375444

376-
_, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), curNode, metav1.UpdateOptions{})
445+
_, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{})
377446
if err != nil {
378447
return err
379448
}
380449

381450
// After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses
382451
// So that users do not see any significant delay in IP addresses being filled into the node
383-
cnc.updateNodeAddress(ctx, curNode, instanceMetadata)
452+
cnc.updateNodeAddress(ctx, newNode, instanceMetadata)
384453

385-
klog.Infof("Successfully initialized node %s with cloud provider", node.Name)
454+
klog.Infof("Successfully initialized node %s with cloud provider", nodeName)
386455
return nil
387456
})
388457
if err != nil {
389-
utilruntime.HandleError(err)
390-
return
458+
return err
391459
}
460+
461+
cnc.recorder.Event(copyNode, v1.EventTypeNormal, "Synced", "Node synced successfully")
462+
return nil
392463
}
393464

394465
// getNodeModifiersFromCloudProvider returns a slice of nodeModifiers that update

0 commit comments

Comments
 (0)