@@ -89,11 +89,10 @@ type cloudCIDRAllocator struct {
89
89
nodeLister corelisters.NodeLister
90
90
// nodesSynced returns true if the node shared informer has been synced at least once.
91
91
nodesSynced cache.InformerSynced
92
- // nodeTopologyClient will be used to read/patch the nodetopology CR.
93
- nodeTopologyClient nodetopologyclientset.Interface
94
92
95
- recorder record.EventRecorder
96
- queue workqueue.RateLimitingInterface
93
+ recorder record.EventRecorder
94
+ queue workqueue.RateLimitingInterface
95
+ nodeTopologyQueue * TaskQueue
97
96
98
97
stackType clusterStackType
99
98
}
@@ -132,17 +131,24 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
132
131
stackType = stackIPv6
133
132
}
134
133
135
- ca := & cloudCIDRAllocator {
136
- client : client ,
134
+ nodeTopologySyncer := & NodeTopologySyncer {
135
+ nodeTopologyClient : nodeTopologyClient ,
137
136
cloud : gceCloud ,
138
- networksLister : nwInformer .Lister (),
139
- gnpLister : gnpInformer .Lister (),
140
137
nodeLister : nodeInformer .Lister (),
141
- nodesSynced : nodeInformer .Informer ().HasSynced ,
142
- nodeTopologyClient : nodeTopologyClient ,
143
- recorder : recorder ,
144
- queue : workqueue .NewRateLimitingQueueWithConfig (workqueue .DefaultControllerRateLimiter (), workqueue.RateLimitingQueueConfig {Name : workqueueName }),
145
- stackType : stackType ,
138
+ }
139
+ nodetopologyQueue := NewTaskQueue ("nodetopologgTaskQueue" , "nodetopologyCRD" , nodeTopologyWorkers , nodeTopologyKeyFun , nodeTopologySyncer .sync )
140
+
141
+ ca := & cloudCIDRAllocator {
142
+ client : client ,
143
+ cloud : gceCloud ,
144
+ networksLister : nwInformer .Lister (),
145
+ gnpLister : gnpInformer .Lister (),
146
+ nodeLister : nodeInformer .Lister (),
147
+ nodesSynced : nodeInformer .Informer ().HasSynced ,
148
+ recorder : recorder ,
149
+ queue : workqueue .NewRateLimitingQueueWithConfig (workqueue .DefaultControllerRateLimiter (), workqueue.RateLimitingQueueConfig {Name : workqueueName }),
150
+ nodeTopologyQueue : nodetopologyQueue ,
151
+ stackType : stackType ,
146
152
}
147
153
148
154
nodeInformer .Informer ().AddEventHandler (cache.ResourceEventHandlerFuncs {
@@ -168,6 +174,26 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
168
174
}),
169
175
DeleteFunc : nodeutil .CreateDeleteNodeHandler (ca .ReleaseCIDR ),
170
176
})
177
+ nodeInformer .Informer ().AddEventHandler (cache.ResourceEventHandlerFuncs {
178
+ AddFunc : nodeutil .CreateAddNodeHandler (func (node * v1.Node ) error {
179
+ if ca .nodeTopologyQueue != nil {
180
+ ca .nodeTopologyQueue .Enqueue (node )
181
+ }
182
+ return nil
183
+ }),
184
+ UpdateFunc : nodeutil .CreateUpdateNodeHandler (func (oldNode , newNode * v1.Node ) error {
185
+ if ca .nodeTopologyQueue != nil {
186
+ nodetopologyQueue .Enqueue (newNode )
187
+ }
188
+ return nil
189
+ }),
190
+ DeleteFunc : nodeutil .CreateDeleteNodeHandler (func (node * v1.Node ) error {
191
+ if ca .nodeTopologyQueue != nil {
192
+ nodetopologyQueue .Enqueue (node )
193
+ }
194
+ return nil
195
+ }),
196
+ })
171
197
172
198
nwInformer .Informer ().AddEventHandler (cache.ResourceEventHandlerFuncs {
173
199
AddFunc : func (originalObj interface {}) {
@@ -244,6 +270,7 @@ func (ca *cloudCIDRAllocator) Run(stopCh <-chan struct{}) {
244
270
ctx , cancelFn := context .WithCancel (context .Background ())
245
271
defer cancelFn ()
246
272
defer ca .queue .ShutDown ()
273
+ defer ca .nodeTopologyQueue .Shutdown ()
247
274
248
275
klog .Infof ("Starting cloud CIDR allocator" )
249
276
defer klog .Infof ("Shutting down cloud CIDR allocator" )
@@ -255,6 +282,18 @@ func (ca *cloudCIDRAllocator) Run(stopCh <-chan struct{}) {
255
282
for i := 0 ; i < cidrUpdateWorkers ; i ++ {
256
283
go wait .UntilWithContext (ctx , ca .runWorker , time .Second )
257
284
}
285
+ if ca .nodeTopologyQueue != nil {
286
+ ca .nodeTopologyQueue .Run ()
287
+ }
288
+
289
+ go func () {
290
+ time .Sleep (nodeTopologyReconcileInterval )
291
+ wait .Until (
292
+ func () {
293
+ ca .nodeTopologyQueue .Enqueue (nodeTopologyReconcileFakeNode )
294
+ },
295
+ nodeTopologyReconcileInterval , stopCh )
296
+ }()
258
297
259
298
<- stopCh
260
299
}
@@ -433,11 +472,6 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error {
433
472
}
434
473
}
435
474
436
- if err := ca .updateNodeTopology (node ); err != nil {
437
- // This is only required for multi subnet clusters. Log and ignore the error.
438
- klog .ErrorS (err , "Failed to update the node topology resource" , "nodeName" , node .Name )
439
- }
440
-
441
475
return err
442
476
}
443
477
@@ -522,7 +556,6 @@ func (ca *cloudCIDRAllocator) updateNodeCIDR(node, oldNode *v1.Node) error {
522
556
func (ca * cloudCIDRAllocator ) ReleaseCIDR (node * v1.Node ) error {
523
557
klog .V (2 ).Infof ("Node %v PodCIDR (%v) will be released by external cloud provider (not managed by controller)" ,
524
558
node .Name , node .Spec .PodCIDR )
525
- // TODO: Handle the nodetopology CR subnet deletion here
526
559
return nil
527
560
}
528
561
0 commit comments