Skip to content

Commit 60338c7

Browse files
KCCM: fix slow node sync + service update
1 parent a8673fa commit 60338c7

File tree

2 files changed

+55
-39
lines changed

2 files changed

+55
-39
lines changed

staging/src/k8s.io/cloud-provider/controllers/service/controller.go

Lines changed: 40 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,10 @@ type Controller struct {
9393
serviceQueue workqueue.RateLimitingInterface
9494
nodeQueue workqueue.RateLimitingInterface
9595
// lastSyncedNodes is used when reconciling node state and keeps track of
96-
// the last synced set of nodes. This field is concurrently safe because the
97-
// nodeQueue is serviced by only one go-routine, so node events are not
98-
// processed concurrently.
99-
lastSyncedNodes []*v1.Node
96+
// the last synced set of nodes per service key. This is accessed from the
97+
// service and node controllers, hence it is protected by a lock.
98+
lastSyncedNodes map[string][]*v1.Node
99+
lastSyncedNodesLock sync.Mutex
100100
}
101101

102102
// New returns a new service controller to keep cloud provider service resources
@@ -124,7 +124,7 @@ func New(
124124
nodeListerSynced: nodeInformer.Informer().HasSynced,
125125
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
126126
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
127-
lastSyncedNodes: []*v1.Node{},
127+
lastSyncedNodes: make(map[string][]*v1.Node),
128128
}
129129

130130
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
@@ -439,16 +439,32 @@ func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service
439439
if err != nil {
440440
return nil, err
441441
}
442-
443442
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
444443
if len(nodes) == 0 {
445444
c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
446445
}
447-
448-
// - Only one protocol supported per service
449446
// - Not all cloud providers support all protocols and the next step is expected to return
450447
// an error for unsupported protocols
451-
return c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes)
448+
status, err := c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes)
449+
if err != nil {
450+
return nil, err
451+
}
452+
c.storeLastSyncedNodes(service, nodes)
453+
return status, nil
454+
}
455+
456+
func (c *Controller) storeLastSyncedNodes(svc *v1.Service, nodes []*v1.Node) {
457+
c.lastSyncedNodesLock.Lock()
458+
defer c.lastSyncedNodesLock.Unlock()
459+
key, _ := cache.MetaNamespaceKeyFunc(svc)
460+
c.lastSyncedNodes[key] = nodes
461+
}
462+
463+
func (c *Controller) getLastSyncedNodes(svc *v1.Service) []*v1.Node {
464+
c.lastSyncedNodesLock.Lock()
465+
defer c.lastSyncedNodesLock.Unlock()
466+
key, _ := cache.MetaNamespaceKeyFunc(svc)
467+
return c.lastSyncedNodes[key]
452468
}
453469

454470
// ListKeys implements the interface required by DeltaFIFO to list the keys we
@@ -662,15 +678,6 @@ func portEqualForLB(x, y *v1.ServicePort) bool {
662678
return true
663679
}
664680

665-
func serviceKeys(services []*v1.Service) sets.String {
666-
ret := sets.NewString()
667-
for _, service := range services {
668-
key, _ := cache.MetaNamespaceKeyFunc(service)
669-
ret.Insert(key)
670-
}
671-
return ret
672-
}
673-
674681
func nodeNames(nodes []*v1.Node) sets.String {
675682
ret := sets.NewString()
676683
for _, node := range nodes {
@@ -737,19 +744,29 @@ func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String {
737744
// load balancers and finished doing it successfully, or didn't try to at all because
738745
// there's no need. This function returns true if we tried to update load balancers and
739746
// failed, indicating to the caller that we should try again.
740-
func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.Node) bool {
747+
func (c *Controller) nodeSyncService(svc *v1.Service) bool {
741748
const retSuccess = false
742749
const retNeedRetry = true
743750
if svc == nil || !wantsLoadBalancer(svc) {
744751
return retSuccess
745752
}
753+
newNodes, err := listWithPredicates(c.nodeLister)
754+
if err != nil {
755+
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
756+
nodeSyncErrorCount.Inc()
757+
return retNeedRetry
758+
}
746759
newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...)
747-
oldNodes = filterWithPredicates(oldNodes, getNodePredicatesForService(svc)...)
748-
760+
oldNodes := filterWithPredicates(c.getLastSyncedNodes(svc), getNodePredicatesForService(svc)...)
761+
// Store last synced nodes without actually determining if we successfully
762+
// synced them or not. Failed node syncs are passed off to retries in the
763+
// service queue, so no need to wait. If we don't store it now, we risk
764+
// re-syncing all LBs twice, one from another sync in the node sync and
765+
// from the service sync
766+
c.storeLastSyncedNodes(svc, newNodes)
749767
if nodesSufficientlyEqual(oldNodes, newNodes) {
750768
return retSuccess
751769
}
752-
753770
klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
754771
if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil {
755772
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err))
@@ -794,20 +811,12 @@ func nodesSufficientlyEqual(oldNodes, newNodes []*v1.Node) bool {
794811
func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) {
795812
klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers)
796813

797-
// Include all nodes and let nodeSyncService filter and figure out if
798-
// the update is relevant for the service in question.
799-
nodes, err := listWithPredicates(c.nodeLister)
800-
if err != nil {
801-
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
802-
return serviceKeys(services)
803-
}
804-
805814
// lock for servicesToRetry
806815
servicesToRetry = sets.NewString()
807816
lock := sync.Mutex{}
808817

809818
doWork := func(piece int) {
810-
if shouldRetry := c.nodeSyncService(services[piece], c.lastSyncedNodes, nodes); !shouldRetry {
819+
if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry {
811820
return
812821
}
813822
lock.Lock()
@@ -816,7 +825,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1
816825
servicesToRetry.Insert(key)
817826
}
818827
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)
819-
c.lastSyncedNodes = nodes
820828
klog.V(4).Infof("Finished updateLoadBalancerHosts")
821829
return servicesToRetry
822830
}

staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controll
177177
nodeListerSynced: nodeInformer.Informer().HasSynced,
178178
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
179179
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
180-
lastSyncedNodes: []*v1.Node{},
180+
lastSyncedNodes: make(map[string][]*v1.Node),
181181
}
182182

183183
informerFactory.Start(stopCh)
@@ -609,7 +609,10 @@ func TestNodeChangesForExternalTrafficPolicyLocalServices(t *testing.T) {
609609
ctx, cancel := context.WithCancel(context.Background())
610610
defer cancel()
611611

612-
controller.lastSyncedNodes = tc.initialState
612+
for _, svc := range services {
613+
key, _ := cache.MetaNamespaceKeyFunc(svc)
614+
controller.lastSyncedNodes[key] = tc.initialState
615+
}
613616

614617
for _, state := range tc.stateChanges {
615618
setupState := func() {
@@ -782,7 +785,10 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) {
782785
ctx, cancel := context.WithCancel(context.Background())
783786
defer cancel()
784787

785-
controller.lastSyncedNodes = tc.initialState
788+
for _, svc := range services {
789+
key, _ := cache.MetaNamespaceKeyFunc(svc)
790+
controller.lastSyncedNodes[key] = tc.initialState
791+
}
786792

787793
for _, state := range tc.stateChanges {
788794
setupState := func() {
@@ -989,7 +995,10 @@ func TestNodesNotEqual(t *testing.T) {
989995
defer cancel()
990996
controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...)
991997

992-
controller.lastSyncedNodes = tc.lastSyncNodes
998+
for _, svc := range services {
999+
key, _ := cache.MetaNamespaceKeyFunc(svc)
1000+
controller.lastSyncedNodes[key] = tc.lastSyncNodes
1001+
}
9931002

9941003
controller.updateLoadBalancerHosts(ctx, services, 5)
9951004
compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls)
@@ -1420,9 +1429,8 @@ func TestSlowNodeSync(t *testing.T) {
14201429
{Service: service1, Hosts: []*v1.Node{node1, node2}},
14211430
// Second update call for impacted service from controller.syncService
14221431
{Service: service2, Hosts: []*v1.Node{node1, node2, node3}},
1423-
// Third update call for second service from controller.syncNodes. Here
1424-
// is the problem: this update call removes the previously added node3.
1425-
{Service: service2, Hosts: []*v1.Node{node1, node2}},
1432+
// Third update call for second service from controller.syncNodes.
1433+
{Service: service2, Hosts: []*v1.Node{node1, node2, node3}},
14261434
}
14271435

14281436
wg := sync.WaitGroup{}

0 commit comments

Comments
 (0)