Skip to content

Commit a1ecedf

Browse files
authored
prevent processing updates leading to sync when controller doing full sync at boot time (#400)
1 parent 041c055 commit a1ecedf

File tree

3 files changed

+45
-19
lines changed

3 files changed

+45
-19
lines changed

pkg/controllers/network_policy_controller.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type NetworkPolicyController struct {
4949
syncPeriod time.Duration
5050
MetricsEnabled bool
5151
v1NetworkPolicy bool
52+
readyForUpdates bool
5253

5354
// list of all active network policies expressed as networkPolicyInfo
5455
networkPoliciesInfo *[]networkPolicyInfo
@@ -137,7 +138,7 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *ControllerHeartbeat,
137138
} else {
138139
sendHeartBeat(healthChan, "NPC")
139140
}
140-
141+
npc.readyForUpdates = true
141142
select {
142143
case <-stopCh:
143144
glog.Infof("Shutting down network policies controller")
@@ -152,6 +153,11 @@ func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
152153
pod := obj.(*api.Pod)
153154
glog.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name)
154155

156+
if !npc.readyForUpdates {
157+
glog.V(3).Infof("Skipping update to pod: %s/%s, controller still performing bootup full-sync", pod.Namespace, pod.Name)
158+
return
159+
}
160+
155161
err := npc.Sync()
156162
if err != nil {
157163
glog.Errorf("Error syncing network policy for the update to pod: %s/%s Error: %s", pod.Namespace, pod.Name, err)
@@ -162,6 +168,12 @@ func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
162168
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) {
163169
netpol := obj.(*networking.NetworkPolicy)
164170
glog.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name)
171+
172+
if !npc.readyForUpdates {
173+
glog.V(3).Infof("Skipping update to network policy: %s/%s, controller still performing bootup full-sync", netpol.Namespace, netpol.Name)
174+
return
175+
}
176+
165177
err := npc.Sync()
166178
if err != nil {
167179
glog.Errorf("Error syncing network policy for the update to network policy: %s/%s Error: %s", netpol.Namespace, netpol.Name, err)
@@ -196,7 +208,7 @@ func (npc *NetworkPolicyController) Sync() error {
196208
if npc.MetricsEnabled {
197209
controllerIptablesSyncTime.WithLabelValues().Set(float64(endTime))
198210
}
199-
glog.V(2).Infof("sync iptables took %v", endTime)
211+
glog.V(1).Infof("sync iptables took %v", endTime)
200212
}()
201213

202214
glog.V(1).Info("Starting periodic sync of iptables")
@@ -1414,8 +1426,12 @@ func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHand
14141426

14151427
},
14161428
UpdateFunc: func(oldObj, newObj interface{}) {
1417-
npc.OnPodUpdate(newObj)
1418-
1429+
newPoObj := newObj.(*api.Pod)
1430+
oldPoObj := oldObj.(*api.Pod)
1431+
if newPoObj.Status.Phase != oldPoObj.Status.Phase || newPoObj.Status.PodIP != oldPoObj.Status.PodIP {
1432+
// for the network policies, we are only interested in pod status phase change or IP change
1433+
npc.OnPodUpdate(newObj)
1434+
}
14191435
},
14201436
DeleteFunc: func(obj interface{}) {
14211437
npc.OnPodUpdate(obj)

pkg/controllers/network_routes_controller.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1429,17 +1429,18 @@ func (nrc *NetworkRoutingController) OnNodeUpdate(obj interface{}) {
14291429
}
14301430

14311431
func (nrc *NetworkRoutingController) OnServiceUpdate(obj interface{}) {
1432-
if !nrc.bgpServerStarted {
1433-
return
1434-
}
1435-
14361432
svc, ok := obj.(*v1core.Service)
14371433
if !ok {
14381434
glog.Errorf("cache indexer returned obj that is not type *v1.Service")
14391435
return
14401436
}
14411437

14421438
glog.V(1).Infof("Received update to service: %s/%s from watch API", svc.Namespace, svc.Name)
1439+
if !nrc.bgpServerStarted {
1440+
glog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name)
1441+
return
1442+
}
1443+
14431444
toAdvertise, toWithdraw, err := nrc.getVIPsForService(svc, true)
14441445
if err != nil {
14451446
glog.Errorf("error getting routes for service: %s, err: %s", svc.Name, err)
@@ -1483,10 +1484,6 @@ func (nrc *NetworkRoutingController) OnServiceDelete(obj interface{}) {
14831484
}
14841485

14851486
func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) {
1486-
if !nrc.bgpServerStarted {
1487-
return
1488-
}
1489-
14901487
ep, ok := obj.(*v1core.Endpoints)
14911488
if !ok {
14921489
glog.Errorf("cache indexer returned obj that is not type *v1.Endpoints")
@@ -1497,6 +1494,12 @@ func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) {
14971494
return
14981495
}
14991496

1497+
glog.V(1).Infof("Received update to endpoint: %s/%s from watch API", ep.Namespace, ep.Name)
1498+
if !nrc.bgpServerStarted {
1499+
glog.V(3).Infof("Skipping update to endpoint: %s/%s, controller still performing bootup full-sync", ep.Namespace, ep.Name)
1500+
return
1501+
}
1502+
15001503
svc, err := nrc.serviceForEndpoints(ep)
15011504
if err != nil {
15021505
glog.Errorf("failed to convert endpoints resource to service: %s", err)

pkg/controllers/network_services_controller.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ type NetworkServicesController struct {
169169
nodeportBindOnAllIp bool
170170
MetricsEnabled bool
171171
ln LinuxNetworking
172+
readyForUpdates bool
172173

173174
svcLister cache.Indexer
174175
epLister cache.Indexer
@@ -246,7 +247,7 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *ControllerHeartbeat
246247
} else {
247248
sendHeartBeat(healthChan, "NSC")
248249
}
249-
250+
nsc.readyForUpdates = true
250251
select {
251252
case <-stopCh:
252253
glog.Info("Shutting down network services controller")
@@ -350,9 +351,6 @@ func (nsc *NetworkServicesController) publishMetrics(serviceInfoMap serviceInfoM
350351

351352
// OnEndpointsUpdate handle change in endpoints update from the API server
352353
func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) {
353-
nsc.mu.Lock()
354-
defer nsc.mu.Unlock()
355-
356354
ep, ok := obj.(*api.Endpoints)
357355
if !ok {
358356
glog.Error("could not convert endpoints update object to *v1.Endpoints")
@@ -364,6 +362,12 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) {
364362
}
365363

366364
glog.V(1).Infof("Received update to endpoint: %s/%s from watch API", ep.Namespace, ep.Name)
365+
if !nsc.readyForUpdates {
366+
glog.V(3).Infof("Skipping update to endpoint: %s/%s, controller still performing bootup full-sync", ep.Namespace, ep.Name)
367+
return
368+
}
369+
nsc.mu.Lock()
370+
defer nsc.mu.Unlock()
367371

368372
// build new service and endpoints map to reflect the change
369373
newServiceMap := nsc.buildServicesInfo()
@@ -381,16 +385,19 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) {
381385

382386
// OnServiceUpdate handle change in service update from the API server
383387
func (nsc *NetworkServicesController) OnServiceUpdate(obj interface{}) {
384-
nsc.mu.Lock()
385-
defer nsc.mu.Unlock()
386-
387388
svc, ok := obj.(*api.Service)
388389
if !ok {
389390
glog.Error("could not convert service update object to *v1.Service")
390391
return
391392
}
392393

393394
glog.V(1).Infof("Received update to service: %s/%s from watch API", svc.Namespace, svc.Name)
395+
if !nsc.readyForUpdates {
396+
glog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name)
397+
return
398+
}
399+
nsc.mu.Lock()
400+
defer nsc.mu.Unlock()
394401

395402
// build new service and endpoints map to reflect the change
396403
newServiceMap := nsc.buildServicesInfo()

0 commit comments

Comments
 (0)