@@ -93,10 +93,10 @@ type Controller struct {
93
93
serviceQueue workqueue.RateLimitingInterface
94
94
nodeQueue workqueue.RateLimitingInterface
95
95
// lastSyncedNodes is used when reconciling node state and keeps track of
96
- // the last synced set of nodes. This field is concurrently safe because the
97
- // nodeQueue is serviced by only one go-routine, so node events are not
98
- // processed concurrently.
99
- lastSyncedNodes [] * v1. Node
96
+ // the last synced set of nodes per service key . This is accessed from the
97
+ // service and node controllers, hence it is protected by a lock.
98
+ lastSyncedNodes map [ string ][] * v1. Node
99
+ lastSyncedNodesLock sync. Mutex
100
100
}
101
101
102
102
// New returns a new service controller to keep cloud provider service resources
@@ -124,7 +124,7 @@ func New(
124
124
nodeListerSynced : nodeInformer .Informer ().HasSynced ,
125
125
serviceQueue : workqueue .NewNamedRateLimitingQueue (workqueue .NewItemExponentialFailureRateLimiter (minRetryDelay , maxRetryDelay ), "service" ),
126
126
nodeQueue : workqueue .NewNamedRateLimitingQueue (workqueue .NewItemExponentialFailureRateLimiter (minRetryDelay , maxRetryDelay ), "node" ),
127
- lastSyncedNodes : [] * v1.Node {} ,
127
+ lastSyncedNodes : make ( map [ string ][] * v1.Node ) ,
128
128
}
129
129
130
130
serviceInformer .Informer ().AddEventHandlerWithResyncPeriod (
@@ -439,16 +439,32 @@ func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service
439
439
if err != nil {
440
440
return nil , err
441
441
}
442
-
443
442
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
444
443
if len (nodes ) == 0 {
445
444
c .eventRecorder .Event (service , v1 .EventTypeWarning , "UnAvailableLoadBalancer" , "There are no available nodes for LoadBalancer" )
446
445
}
447
-
448
- // - Only one protocol supported per service
446
+ c .storeLastSyncedNodes (service , nodes )
449
447
// - Not all cloud providers support all protocols and the next step is expected to return
450
448
// an error for unsupported protocols
451
- return c .balancer .EnsureLoadBalancer (ctx , c .clusterName , service , nodes )
449
+ status , err := c .balancer .EnsureLoadBalancer (ctx , c .clusterName , service , nodes )
450
+ if err != nil {
451
+ return nil , err
452
+ }
453
+ return status , nil
454
+ }
455
+
456
+ func (c * Controller ) storeLastSyncedNodes (svc * v1.Service , nodes []* v1.Node ) {
457
+ c .lastSyncedNodesLock .Lock ()
458
+ defer c .lastSyncedNodesLock .Unlock ()
459
+ key , _ := cache .MetaNamespaceKeyFunc (svc )
460
+ c .lastSyncedNodes [key ] = nodes
461
+ }
462
+
463
+ func (c * Controller ) getLastSyncedNodes (svc * v1.Service ) []* v1.Node {
464
+ c .lastSyncedNodesLock .Lock ()
465
+ defer c .lastSyncedNodesLock .Unlock ()
466
+ key , _ := cache .MetaNamespaceKeyFunc (svc )
467
+ return c .lastSyncedNodes [key ]
452
468
}
453
469
454
470
// ListKeys implements the interface required by DeltaFIFO to list the keys we
@@ -662,15 +678,6 @@ func portEqualForLB(x, y *v1.ServicePort) bool {
662
678
return true
663
679
}
664
680
665
- func serviceKeys (services []* v1.Service ) sets.String {
666
- ret := sets .NewString ()
667
- for _ , service := range services {
668
- key , _ := cache .MetaNamespaceKeyFunc (service )
669
- ret .Insert (key )
670
- }
671
- return ret
672
- }
673
-
674
681
func nodeNames (nodes []* v1.Node ) sets.String {
675
682
ret := sets .NewString ()
676
683
for _ , node := range nodes {
@@ -737,19 +744,29 @@ func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String {
737
744
// load balancers and finished doing it successfully, or didn't try to at all because
738
745
// there's no need. This function returns true if we tried to update load balancers and
739
746
// failed, indicating to the caller that we should try again.
740
- func (c * Controller ) nodeSyncService (svc * v1.Service , oldNodes , newNodes [] * v1. Node ) bool {
747
+ func (c * Controller ) nodeSyncService (svc * v1.Service ) bool {
741
748
const retSuccess = false
742
749
const retNeedRetry = true
743
750
if svc == nil || ! wantsLoadBalancer (svc ) {
744
751
return retSuccess
745
752
}
753
+ newNodes , err := listWithPredicates (c .nodeLister )
754
+ if err != nil {
755
+ runtime .HandleError (fmt .Errorf ("failed to retrieve node list: %v" , err ))
756
+ nodeSyncErrorCount .Inc ()
757
+ return retNeedRetry
758
+ }
746
759
newNodes = filterWithPredicates (newNodes , getNodePredicatesForService (svc )... )
747
- oldNodes = filterWithPredicates (oldNodes , getNodePredicatesForService (svc )... )
748
-
760
+ oldNodes := filterWithPredicates (c .getLastSyncedNodes (svc ), getNodePredicatesForService (svc )... )
761
+ // Store last synced nodes without actually determining if we successfully
762
+ // synced them or not. Failed node syncs are passed off to retries in the
763
+ // service queue, so no need to wait. If we don't store it now, we risk
764
+ // re-syncing all LBs twice, one from another sync in the node sync and
765
+ // from the service sync
766
+ c .storeLastSyncedNodes (svc , newNodes )
749
767
if nodesSufficientlyEqual (oldNodes , newNodes ) {
750
768
return retSuccess
751
769
}
752
-
753
770
klog .V (4 ).Infof ("nodeSyncService started for service %s/%s" , svc .Namespace , svc .Name )
754
771
if err := c .lockedUpdateLoadBalancerHosts (svc , newNodes ); err != nil {
755
772
runtime .HandleError (fmt .Errorf ("failed to update load balancer hosts for service %s/%s: %v" , svc .Namespace , svc .Name , err ))
@@ -794,20 +811,12 @@ func nodesSufficientlyEqual(oldNodes, newNodes []*v1.Node) bool {
794
811
func (c * Controller ) updateLoadBalancerHosts (ctx context.Context , services []* v1.Service , workers int ) (servicesToRetry sets.String ) {
795
812
klog .V (4 ).Infof ("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)" , len (services ), workers )
796
813
797
- // Include all nodes and let nodeSyncService filter and figure out if
798
- // the update is relevant for the service in question.
799
- nodes , err := listWithPredicates (c .nodeLister )
800
- if err != nil {
801
- runtime .HandleError (fmt .Errorf ("failed to retrieve node list: %v" , err ))
802
- return serviceKeys (services )
803
- }
804
-
805
814
// lock for servicesToRetry
806
815
servicesToRetry = sets .NewString ()
807
816
lock := sync.Mutex {}
808
817
809
818
doWork := func (piece int ) {
810
- if shouldRetry := c .nodeSyncService (services [piece ], c . lastSyncedNodes , nodes ); ! shouldRetry {
819
+ if shouldRetry := c .nodeSyncService (services [piece ]); ! shouldRetry {
811
820
return
812
821
}
813
822
lock .Lock ()
@@ -816,7 +825,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1
816
825
servicesToRetry .Insert (key )
817
826
}
818
827
workqueue .ParallelizeUntil (ctx , workers , len (services ), doWork )
819
- c .lastSyncedNodes = nodes
820
828
klog .V (4 ).Infof ("Finished updateLoadBalancerHosts" )
821
829
return servicesToRetry
822
830
}
0 commit comments