Skip to content

Commit 12f93cb

Browse files
authored
nodeTopology: de-dup enqueue when there is no label change (#856)
1 parent e566eb7 commit 12f93cb

File tree

3 files changed

+194
-16
lines changed

3 files changed

+194
-16
lines changed

pkg/controller/nodeipam/ipam/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ go_test(
9090
"//vendor/k8s.io/api/core/v1:core",
9191
"//vendor/k8s.io/apimachinery/pkg/api/resource",
9292
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:meta",
93+
"//vendor/k8s.io/apimachinery/pkg/runtime/schema",
9394
"//vendor/k8s.io/apimachinery/pkg/util/wait",
9495
"//vendor/k8s.io/client-go/informers",
9596
"//vendor/k8s.io/client-go/informers/core/v1:core",

pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,7 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
188188
ca.nodeTopologyQueue.Enqueue(node)
189189
return nil
190190
}),
191-
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
192-
nodetopologyQueue.Enqueue(newNode)
193-
return nil
194-
}),
191+
UpdateFunc: nodeutil.CreateUpdateNodeHandler(ca.updateUniqueNode),
195192
DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
196193
nodetopologyQueue.Enqueue(node)
197194
return nil
@@ -268,6 +265,17 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
268265
return ca, nil
269266
}
270267

268+
func (ca *cloudCIDRAllocator) updateUniqueNode(oldNode, newNode *v1.Node) error {
269+
_, oldNodeLabel := getNodeSubnetLabel(oldNode)
270+
_, newNodeLabel := getNodeSubnetLabel(newNode)
271+
if oldNodeLabel != newNodeLabel {
272+
ca.nodeTopologyQueue.Enqueue(newNode)
273+
} else {
274+
klog.InfoS("Node subnet label does not change, skip enqueue item, label key: cloud.google.com/gke-node-pool-subnet", "node", newNode.GetName(), "oldlabel", oldNodeLabel, "newlabel", newNodeLabel)
275+
}
276+
return nil
277+
}
278+
271279
func (ca *cloudCIDRAllocator) Run(stopCh <-chan struct{}) {
272280
defer utilruntime.HandleCrash()
273281

pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go

Lines changed: 181 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
v1 "k8s.io/api/core/v1"
3939
"k8s.io/apimachinery/pkg/api/resource"
4040
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
41+
runtimeSchema "k8s.io/apimachinery/pkg/runtime/schema"
4142
"k8s.io/apimachinery/pkg/util/wait"
4243
"k8s.io/client-go/informers"
4344
"k8s.io/client-go/kubernetes/fake"
@@ -187,7 +188,7 @@ func TestNodeTopologyQueuePeriodicSync(t *testing.T) {
187188
}
188189
}
189190

190-
func TestNodeTopologyQueue_AddOrUpdate(t *testing.T) {
191+
func TestNodeTopologyCR_AddOrUpdateNode(t *testing.T) {
191192
testClusterValues := gce.DefaultTestClusterValues()
192193
testClusterValues.SubnetworkURL = exampleSubnetURL
193194
fakeGCE := gce.NewFakeGCECloud(testClusterValues)
@@ -210,7 +211,7 @@ func TestNodeTopologyQueue_AddOrUpdate(t *testing.T) {
210211
},
211212
},
212213
}
213-
fakeClient := fake.NewSimpleClientset(defaultnode, mscnode)
214+
fakeClient := fake.NewSimpleClientset(defaultnode)
214215
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, time.Second)
215216
fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes()
216217

@@ -230,7 +231,8 @@ func TestNodeTopologyQueue_AddOrUpdate(t *testing.T) {
230231
go cloudAllocator.Run(wait.NeverStop)
231232

232233
// TODO: Fix node_topology_syncer addOrUpdate should add default subnet regardless of nodes ordering on the informer
233-
fakeNodeInformer.Informer().GetStore().Add(mscnode)
234+
time.Sleep(time.Millisecond * 500)
235+
fakeClient.Tracker().Add(mscnode)
234236
expectedSubnets := []string{"subnet-def", "subnet1"}
235237
i := 0
236238
for i < 5 {
@@ -267,9 +269,40 @@ func TestNodeTopologyQueue_AddOrUpdate(t *testing.T) {
267269
if i >= 5 {
268270
t.Fatalf("AddOrUpdate node topology CRD not working as expected")
269271
}
272+
// Node subnet label should be immutable, update it just to test update node path
273+
mscnode2.ObjectMeta.Labels[testNodePoolSubnetLabelPrefix] = "subnet3"
274+
// TODO: automatically get gvr instead of hardcode
275+
gvr := runtimeSchema.GroupVersionResource{
276+
Version: "v1",
277+
Resource: "nodes",
278+
}
279+
fakeClient.Tracker().Update(gvr, mscnode2, mscnode2.GetNamespace(), metav1.UpdateOptions{})
280+
expectedSubnets = []string{"subnet-def", "subnet1", "subnet2", "subnet3"}
281+
i = 0
282+
for i < 5 {
283+
if ok, _ := verifySubnetsInCR(t, expectedSubnets, nodeTopologyClient); ok {
284+
break
285+
} else {
286+
time.Sleep(time.Millisecond * 500)
287+
i++
288+
}
289+
}
290+
if i >= 5 {
291+
t.Fatalf("UpdateNode with different subnet lable should not dedup when enqueueing")
292+
}
293+
// Reset nodetopology just for test update node de-dup when the label didn't change
294+
nodeTopologyClient.NetworkingV1().NodeTopologies().UpdateStatus(context.TODO(), ensuredNodeTopologyCR, metav1.UpdateOptions{})
295+
// Update the node w/o changing node pool subnet label should de-dup, not enqueue
296+
mscnode2.ObjectMeta.Labels[testNodePoolSubnetLabelPrefix] = "subnet3"
297+
fakeClient.Tracker().Update(gvr, mscnode2, mscnode2.GetNamespace(), metav1.UpdateOptions{})
298+
time.Sleep(time.Millisecond * 500)
299+
expectedSubnets = []string{}
300+
if ok, _ := verifySubnetsInCR(t, expectedSubnets, nodeTopologyClient); !ok {
301+
t.Fatalf("UpdateNode with the same subnet lable should dedup when enqueueing")
302+
}
270303
}
271304

272-
func TestNodeTopologyCR_DELETION(t *testing.T) {
305+
func TestNodeTopologyCR_DeleteNode(t *testing.T) {
273306
testClusterValues := gce.DefaultTestClusterValues()
274307
testClusterValues.SubnetworkURL = exampleSubnetURL
275308
fakeGCE := gce.NewFakeGCECloud(testClusterValues)
@@ -279,15 +312,12 @@ func TestNodeTopologyCR_DELETION(t *testing.T) {
279312
nwInformer := nwInfFactory.V1().Networks()
280313
gnpInformer := nwInfFactory.V1().GKENetworkParamSets()
281314

282-
mscnode := &v1.Node{
315+
defaultnode := &v1.Node{
283316
ObjectMeta: metav1.ObjectMeta{
284-
Name: "testNode",
285-
Labels: map[string]string{
286-
testNodePoolSubnetLabelPrefix: "subnet1",
287-
},
317+
Name: "nodeTopologyDefautNode",
288318
},
289319
}
290-
fakeClient := fake.NewSimpleClientset()
320+
fakeClient := fake.NewSimpleClientset(defaultnode)
291321
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, time.Second)
292322
fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes()
293323

@@ -306,9 +336,17 @@ func TestNodeTopologyCR_DELETION(t *testing.T) {
306336
fakeInformerFactory.Start(wait.NeverStop)
307337
go cloudAllocator.Run(wait.NeverStop)
308338

309-
fakeNodeInformer.Informer().GetStore().Add(mscnode)
339+
mscnode := &v1.Node{
340+
ObjectMeta: metav1.ObjectMeta{
341+
Name: "testNode",
342+
Labels: map[string]string{
343+
testNodePoolSubnetLabelPrefix: "subnet1",
344+
},
345+
},
346+
}
347+
fakeClient.Tracker().Add(mscnode)
310348

311-
expectedSubnets := []string{"subnet-def"}
349+
expectedSubnets := []string{"subnet-def", "subnet1"}
312350
i := 0
313351
for i < 5 {
314352
if ok, _ := verifySubnetsInCR(t, expectedSubnets, nodeTopologyClient); ok {
@@ -318,11 +356,142 @@ func TestNodeTopologyCR_DELETION(t *testing.T) {
318356
i++
319357
}
320358
}
359+
if i >= 5 {
360+
t.Fatalf("Add node topology CR not working as expected")
361+
}
362+
// TODO: automatically get gvr instead of using hardcoded value
363+
gvr := runtimeSchema.GroupVersionResource{
364+
Version: "v1",
365+
Resource: "nodes",
366+
}
367+
fakeClient.Tracker().Delete(gvr, mscnode.GetNamespace(), mscnode.GetName(), metav1.DeleteOptions{})
368+
369+
expectedSubnets = []string{"subnet-def"}
370+
i = 0
371+
for i < 5 {
372+
if ok, _ := verifySubnetsInCR(t, expectedSubnets, nodeTopologyClient); ok {
373+
break
374+
} else {
375+
time.Sleep(time.Millisecond * 500)
376+
i++
377+
}
378+
}
321379
if i >= 5 {
322380
t.Fatalf("Delete node topology CR not working as expected")
323381
}
324382
}
325383

384+
func TestUpdateUniqueNode(t *testing.T) {
385+
testClusterValues := gce.DefaultTestClusterValues()
386+
fakeGCE := gce.NewFakeGCECloud(testClusterValues)
387+
nodeTopologySyncer := &NodeTopologySyncer{
388+
nodeTopologyClient: ntfakeclient.NewSimpleClientset(),
389+
cloud: fakeGCE,
390+
}
391+
tests := []struct {
392+
name string
393+
oldNode *v1.Node
394+
newNode *v1.Node
395+
queued bool
396+
}{
397+
{
398+
name: "DuplicatedNodeLabel",
399+
oldNode: &v1.Node{
400+
ObjectMeta: metav1.ObjectMeta{
401+
Name: "testNode",
402+
Labels: map[string]string{
403+
testNodePoolSubnetLabelPrefix: "subnet1",
404+
},
405+
},
406+
},
407+
newNode: &v1.Node{
408+
ObjectMeta: metav1.ObjectMeta{
409+
Name: "testNode",
410+
Labels: map[string]string{
411+
testNodePoolSubnetLabelPrefix: "subnet1",
412+
},
413+
},
414+
},
415+
queued: false,
416+
},
417+
{
418+
name: "UpdatedNodeLable",
419+
oldNode: &v1.Node{
420+
ObjectMeta: metav1.ObjectMeta{
421+
Name: "testNode",
422+
Labels: map[string]string{
423+
testNodePoolSubnetLabelPrefix: "subnet1",
424+
},
425+
},
426+
},
427+
newNode: &v1.Node{
428+
ObjectMeta: metav1.ObjectMeta{
429+
Name: "testNode",
430+
Labels: map[string]string{
431+
testNodePoolSubnetLabelPrefix: "subnet2",
432+
},
433+
},
434+
},
435+
queued: true,
436+
},
437+
{
438+
name: "DifferentLabelName",
439+
oldNode: &v1.Node{
440+
ObjectMeta: metav1.ObjectMeta{
441+
Name: "testNode",
442+
Labels: map[string]string{
443+
"cloud.google.com/unrelated": "subnet1",
444+
},
445+
},
446+
},
447+
newNode: &v1.Node{
448+
ObjectMeta: metav1.ObjectMeta{
449+
Name: "testNode",
450+
Labels: map[string]string{
451+
testNodePoolSubnetLabelPrefix: "subnet1",
452+
},
453+
},
454+
},
455+
queued: true,
456+
},
457+
{
458+
name: "EmptyLabel",
459+
oldNode: &v1.Node{
460+
ObjectMeta: metav1.ObjectMeta{
461+
Name: "testNode",
462+
Labels: map[string]string{},
463+
},
464+
},
465+
newNode: &v1.Node{
466+
ObjectMeta: metav1.ObjectMeta{
467+
Name: "testNode",
468+
Labels: map[string]string{
469+
testNodePoolSubnetLabelPrefix: "subnet1",
470+
},
471+
},
472+
},
473+
queued: true,
474+
},
475+
}
476+
for _, tc := range tests {
477+
t.Run(tc.name, func(t *testing.T) {
478+
nodetopologyQueue := NewTaskQueue("nodetopologgTaskQueueForTest", "nodetopologyCRD", 1, nodeTopologyKeyFun, nodeTopologySyncer.sync)
479+
ca := &cloudCIDRAllocator{
480+
nodeTopologyQueue: nodetopologyQueue,
481+
}
482+
ca.updateUniqueNode(tc.oldNode, tc.newNode)
483+
expectLen := 0
484+
if tc.queued {
485+
expectLen = 1
486+
}
487+
got := nodetopologyQueue.queue.Len()
488+
if got != expectLen {
489+
t.Errorf("updateUniqueNode(%v, %v) returned queued %v, but want %v", tc.oldNode, tc.newNode, got, expectLen)
490+
}
491+
})
492+
}
493+
}
494+
326495
func TestUpdateCIDRAllocation(t *testing.T) {
327496
ipv4ipv6Stack := stackIPv4IPv6
328497
ipv6ipv4Stack := stackIPv6IPv4

0 commit comments

Comments
 (0)