Skip to content

Commit a8673fa

Browse files
KCCM: add test validating slow node sync issue
1 parent e8d4559 commit a8673fa

File tree

2 files changed

+203
-36
lines changed

2 files changed

+203
-36
lines changed

staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go

Lines changed: 178 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"reflect"
2424
"sort"
2525
"strings"
26+
"sync"
2627
"testing"
2728
"time"
2829

@@ -146,13 +147,16 @@ func defaultExternalService() *v1.Service {
146147
return newService("external-balancer", v1.ServiceTypeLoadBalancer)
147148
}
148149

149-
func alwaysReady() bool { return true }
150-
151-
func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
150+
// newController creates a new service controller. Callers have the option to
151+
// specify `stopChan` for test cases which might require running the
152+
// node/service informers and reacting to resource events. Callers can also
153+
// specify `objects` which represent the initial state of objects, used to
154+
// populate the client set / informer cache at start-up.
155+
func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controller, *fakecloud.Cloud, *fake.Clientset) {
152156
cloud := &fakecloud.Cloud{}
153157
cloud.Region = region
154158

155-
kubeClient := fake.NewSimpleClientset()
159+
kubeClient := fake.NewSimpleClientset(objects...)
156160
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
157161
serviceInformer := informerFactory.Core().V1().Services()
158162
nodeInformer := informerFactory.Core().V1().Nodes()
@@ -162,26 +166,36 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
162166
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})
163167

164168
controller := &Controller{
165-
cloud: cloud,
166-
kubeClient: kubeClient,
167-
clusterName: "test-cluster",
168-
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
169-
eventBroadcaster: broadcaster,
170-
eventRecorder: recorder,
171-
nodeLister: newFakeNodeLister(nil),
172-
nodeListerSynced: nodeInformer.Informer().HasSynced,
173-
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
174-
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
175-
lastSyncedNodes: []*v1.Node{},
169+
cloud: cloud,
170+
kubeClient: kubeClient,
171+
clusterName: "test-cluster",
172+
eventBroadcaster: broadcaster,
173+
eventRecorder: recorder,
174+
serviceLister: serviceInformer.Lister(),
175+
serviceListerSynced: serviceInformer.Informer().HasSynced,
176+
nodeLister: nodeInformer.Lister(),
177+
nodeListerSynced: nodeInformer.Informer().HasSynced,
178+
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
179+
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
180+
lastSyncedNodes: []*v1.Node{},
176181
}
177182

183+
informerFactory.Start(stopCh)
184+
informerFactory.WaitForCacheSync(stopCh)
185+
186+
serviceMap := make(map[string]*cachedService)
187+
services, _ := serviceInformer.Lister().List(labels.Everything())
188+
for _, service := range services {
189+
serviceMap[service.Name] = &cachedService{
190+
state: service,
191+
}
192+
}
193+
194+
controller.cache = &serviceCache{serviceMap: serviceMap}
195+
178196
balancer, _ := cloud.LoadBalancer()
179197
controller.balancer = balancer
180198

181-
controller.serviceLister = serviceInformer.Lister()
182-
183-
controller.nodeListerSynced = alwaysReady
184-
controller.serviceListerSynced = alwaysReady
185199
controller.eventRecorder = record.NewFakeRecorder(100)
186200

187201
cloud.Calls = nil // ignore any cloud calls made in init()
@@ -265,7 +279,7 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) {
265279
t.Run(tc.desc, func(t *testing.T) {
266280
ctx, cancel := context.WithCancel(context.Background())
267281
defer cancel()
268-
controller, cloud, client := newController()
282+
controller, cloud, client := newController(nil)
269283
cloud.Exists = tc.lbExists
270284
key := fmt.Sprintf("%s/%s", tc.service.Namespace, tc.service.Name)
271285
if _, err := client.CoreV1().Services(tc.service.Namespace).Create(ctx, tc.service, metav1.CreateOptions{}); err != nil {
@@ -439,7 +453,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
439453
t.Run(item.desc, func(t *testing.T) {
440454
ctx, cancel := context.WithCancel(context.Background())
441455
defer cancel()
442-
controller, cloud, _ := newController()
456+
controller, cloud, _ := newController(nil)
443457
controller.nodeLister = newFakeNodeLister(nil, nodes...)
444458
if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 {
445459
t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry)
@@ -590,7 +604,8 @@ func TestNodeChangesForExternalTrafficPolicyLocalServices(t *testing.T) {
590604
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{},
591605
}} {
592606
t.Run(tc.desc, func(t *testing.T) {
593-
controller, cloud, _ := newController()
607+
controller, cloud, _ := newController(nil)
608+
594609
ctx, cancel := context.WithCancel(context.Background())
595610
defer cancel()
596611

@@ -762,7 +777,8 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) {
762777
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{},
763778
}} {
764779
t.Run(tc.desc, func(t *testing.T) {
765-
controller, cloud, _ := newController()
780+
controller, cloud, _ := newController(nil)
781+
766782
ctx, cancel := context.WithCancel(context.Background())
767783
defer cancel()
768784

@@ -806,7 +822,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
806822
serviceNames.Insert(fmt.Sprintf("%s/%s", svc.GetObjectMeta().GetNamespace(), svc.GetObjectMeta().GetName()))
807823
}
808824

809-
controller, cloud, _ := newController()
825+
controller, cloud, _ := newController(nil)
810826
for _, tc := range []struct {
811827
desc string
812828
nodes []*v1.Node
@@ -901,8 +917,28 @@ func compareUpdateCalls(t *testing.T, left, right []fakecloud.UpdateBalancerCall
901917
}
902918
}
903919

920+
// compareHostSets compares if the nodes in left are in right, despite the order.
921+
func compareHostSets(t *testing.T, left, right []*v1.Node) bool {
922+
if len(left) != len(right) {
923+
return false
924+
}
925+
for _, lHost := range left {
926+
found := false
927+
for _, rHost := range right {
928+
if reflect.DeepEqual(lHost, rHost) {
929+
found = true
930+
break
931+
}
932+
}
933+
if !found {
934+
return false
935+
}
936+
}
937+
return true
938+
}
939+
904940
func TestNodesNotEqual(t *testing.T) {
905-
controller, cloud, _ := newController()
941+
controller, cloud, _ := newController(nil)
906942

907943
services := []*v1.Service{
908944
newService("s0", v1.ServiceTypeLoadBalancer),
@@ -952,7 +988,9 @@ func TestNodesNotEqual(t *testing.T) {
952988
ctx, cancel := context.WithCancel(context.Background())
953989
defer cancel()
954990
controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...)
991+
955992
controller.lastSyncedNodes = tc.lastSyncNodes
993+
956994
controller.updateLoadBalancerHosts(ctx, services, 5)
957995
compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls)
958996
cloud.UpdateCalls = []fakecloud.UpdateBalancerCall{}
@@ -961,7 +999,7 @@ func TestNodesNotEqual(t *testing.T) {
961999
}
9621000

9631001
func TestProcessServiceCreateOrUpdate(t *testing.T) {
964-
controller, _, client := newController()
1002+
controller, _, client := newController(nil)
9651003

9661004
//A pair of old and new loadbalancer IP address
9671005
oldLBIP := "192.168.1.1"
@@ -1076,7 +1114,7 @@ func TestProcessServiceCreateOrUpdateK8sError(t *testing.T) {
10761114
svc := newService(svcName, v1.ServiceTypeLoadBalancer)
10771115
// Preset finalizer so k8s error only happens when patching status.
10781116
svc.Finalizers = []string{servicehelper.LoadBalancerCleanupFinalizer}
1079-
controller, _, client := newController()
1117+
controller, _, client := newController(nil)
10801118
client.PrependReactor("patch", "services", func(action core.Action) (bool, runtime.Object, error) {
10811119
return true, nil, tc.k8sErr
10821120
})
@@ -1120,7 +1158,7 @@ func TestSyncService(t *testing.T) {
11201158
testName: "if an invalid service name is synced",
11211159
key: "invalid/key/string",
11221160
updateFn: func() {
1123-
controller, _, _ = newController()
1161+
controller, _, _ = newController(nil)
11241162
},
11251163
expectedFn: func(e error) error {
11261164
//TODO: should find a way to test for dependent package errors in such a way that it won't break
@@ -1152,7 +1190,7 @@ func TestSyncService(t *testing.T) {
11521190
key: "external-balancer",
11531191
updateFn: func() {
11541192
testSvc := defaultExternalService()
1155-
controller, _, _ = newController()
1193+
controller, _, _ = newController(nil)
11561194
controller.enqueueService(testSvc)
11571195
svc := controller.cache.getOrCreate("external-balancer")
11581196
svc.state = testSvc
@@ -1258,7 +1296,7 @@ func TestProcessServiceDeletion(t *testing.T) {
12581296
defer cancel()
12591297

12601298
//Create a new controller.
1261-
controller, cloud, _ = newController()
1299+
controller, cloud, _ = newController(nil)
12621300
tc.updateFn(controller)
12631301
obtainedErr := controller.processServiceDeletion(ctx, svcKey)
12641302
if err := tc.expectedFn(obtainedErr); err != nil {
@@ -1333,8 +1371,115 @@ func TestNeedsCleanup(t *testing.T) {
13331371

13341372
}
13351373

1336-
func TestNeedsUpdate(t *testing.T) {
1374+
// This tests a service update while a slow node sync is happening. If we have multiple
1375+
// services to process from a node sync: each service will experience a sync delta.
1376+
// If a new Node is added and a service is synced while this happens: we want to
1377+
// make sure that the slow node sync never removes the Node from LB set because it
1378+
// has stale data.
1379+
func TestSlowNodeSync(t *testing.T) {
1380+
stopCh, updateCallCh := make(chan struct{}), make(chan fakecloud.UpdateBalancerCall)
1381+
defer close(stopCh)
1382+
defer close(updateCallCh)
1383+
1384+
duration := time.Millisecond
13371385

1386+
syncService := make(chan string)
1387+
1388+
node1 := makeNode(tweakName("node1"))
1389+
node2 := makeNode(tweakName("node2"))
1390+
node3 := makeNode(tweakName("node3"))
1391+
service1 := newService("service1", v1.ServiceTypeLoadBalancer)
1392+
service2 := newService("service2", v1.ServiceTypeLoadBalancer)
1393+
1394+
sKey1, _ := cache.MetaNamespaceKeyFunc(service1)
1395+
sKey2, _ := cache.MetaNamespaceKeyFunc(service2)
1396+
serviceKeys := sets.New(sKey1, sKey2)
1397+
1398+
controller, cloudProvider, kubeClient := newController(stopCh, node1, node2, service1, service2)
1399+
cloudProvider.RequestDelay = 4 * duration
1400+
cloudProvider.UpdateCallCb = func(update fakecloud.UpdateBalancerCall) {
1401+
updateCallCh <- update
1402+
}
1403+
cloudProvider.EnsureCallCb = func(update fakecloud.UpdateBalancerCall) {
1404+
updateCallCh <- update
1405+
}
1406+
1407+
// Three update calls are expected. This is because this test calls
1408+
// controller.syncNodes once with two existing services, so we will have an
1409+
// update call for each service, and controller.syncService once. The end
1410+
// result is therefore three update calls. Each update call takes
1411+
// cloudProvider.RequestDelay to process. The test asserts that the order of
1412+
// the Hosts defined by the update calls is respected, but doesn't
1413+
// necessarily assert the order of the Service. This is because the
1414+
// controller implementation doesn't use a deterministic order when syncing
1415+
// services. The test therefor works out which service is impacted by the
1416+
// slow node sync (which will be whatever service is not synced first) and
1417+
// then validates that the Hosts for each update call is respected.
1418+
expectedUpdateCalls := []fakecloud.UpdateBalancerCall{
1419+
// First update call for first service from controller.syncNodes
1420+
{Service: service1, Hosts: []*v1.Node{node1, node2}},
1421+
// Second update call for impacted service from controller.syncService
1422+
{Service: service2, Hosts: []*v1.Node{node1, node2, node3}},
1423+
// Third update call for second service from controller.syncNodes. Here
1424+
// is the problem: this update call removes the previously added node3.
1425+
{Service: service2, Hosts: []*v1.Node{node1, node2}},
1426+
}
1427+
1428+
wg := sync.WaitGroup{}
1429+
wg.Add(1)
1430+
go func() {
1431+
defer wg.Done()
1432+
controller.syncNodes(context.TODO(), 1)
1433+
}()
1434+
1435+
wg.Add(1)
1436+
go func() {
1437+
defer wg.Done()
1438+
updateCallIdx := 0
1439+
impactedService := ""
1440+
for update := range updateCallCh {
1441+
// Validate that the call hosts are what we expect
1442+
if !compareHostSets(t, expectedUpdateCalls[updateCallIdx].Hosts, update.Hosts) {
1443+
t.Errorf("unexpected updated hosts for update: %v, expected: %v, got: %v", updateCallIdx, expectedUpdateCalls[updateCallIdx].Hosts, update.Hosts)
1444+
return
1445+
}
1446+
key, _ := cache.MetaNamespaceKeyFunc(update.Service)
1447+
// For call 0: determine impacted service
1448+
if updateCallIdx == 0 {
1449+
impactedService = serviceKeys.Difference(sets.New(key)).UnsortedList()[0]
1450+
syncService <- impactedService
1451+
}
1452+
// For calls > 0: validate the impacted service
1453+
if updateCallIdx > 0 {
1454+
if key != impactedService {
1455+
t.Error("unexpected impacted service")
1456+
return
1457+
}
1458+
}
1459+
if updateCallIdx == len(expectedUpdateCalls)-1 {
1460+
return
1461+
}
1462+
updateCallIdx++
1463+
}
1464+
}()
1465+
1466+
key := <-syncService
1467+
if _, err := kubeClient.CoreV1().Nodes().Create(context.TODO(), node3, metav1.CreateOptions{}); err != nil {
1468+
t.Fatalf("error creating node3, err: %v", err)
1469+
}
1470+
1471+
// Give it some time to update the informer cache, needs to be lower than
1472+
// cloudProvider.RequestDelay
1473+
time.Sleep(duration)
1474+
// Sync the service
1475+
if err := controller.syncService(context.TODO(), key); err != nil {
1476+
t.Errorf("unexpected service sync error, err: %v", err)
1477+
}
1478+
1479+
wg.Wait()
1480+
}
1481+
1482+
func TestNeedsUpdate(t *testing.T) {
13381483
testCases := []struct {
13391484
testName string //Name of the test case
13401485
updateFn func() (*v1.Service, *v1.Service) //Function to update the service object
@@ -1494,7 +1639,7 @@ func TestNeedsUpdate(t *testing.T) {
14941639
expectedNeedsUpdate: true,
14951640
}}
14961641

1497-
controller, _, _ := newController()
1642+
controller, _, _ := newController(nil)
14981643
for _, tc := range testCases {
14991644
oldSvc, newSvc := tc.updateFn()
15001645
obtainedResult := controller.needsUpdate(oldSvc, newSvc)
@@ -2441,7 +2586,7 @@ func TestServiceQueueDelay(t *testing.T) {
24412586

24422587
for _, tc := range tests {
24432588
t.Run(tc.name, func(t *testing.T) {
2444-
controller, cloud, client := newController()
2589+
controller, cloud, client := newController(nil)
24452590
queue := &spyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")}
24462591
controller.serviceQueue = queue
24472592
cloud.Err = tc.lbCloudErr

0 commit comments

Comments
 (0)