Skip to content

Commit 85d8df4

Browse files
jimmy-zhroffe
authored andcommitted
Improve health check for cache synchronization (#498)
* improve Control flow logic * drop log,comment,sleep * update user-guide.md * set cache-sync-timeout default to 1m
1 parent e2ee6a7 commit 85d8df4

File tree

4 files changed

+73
-37
lines changed

4 files changed

+73
-37
lines changed

docs/user-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Usage of kube-router:
3131
--advertise-loadbalancer-ip Add LoadbBalancer IP of service status as set by the LB provider to the RIB so that it gets advertised to the BGP peers.
3232
--advertise-pod-cidr Add Node's POD cidr to the RIB so that it gets advertised to the BGP peers. (default true)
3333
--bgp-graceful-restart Enables the BGP Graceful Restart capability so that routes are preserved on unexpected restarts
34+
--cache-sync-timeout duration The timeout for cache synchronization (e.g. '5s', '1m'). Must be greater than 0. (default 1m0s)
3435
--cleanup-config Cleanup iptables rules, ipvs, ipset configuration and exit.
3536
--cluster-asn uint ASN number under which cluster nodes will run iBGP.
3637
--cluster-cidr string CIDR range of pods in the cluster. It is used to identify traffic originating from and destinated to pods.

pkg/cmd/kube-router.go

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"k8s.io/client-go/kubernetes"
2222
"k8s.io/client-go/rest"
2323
"k8s.io/client-go/tools/clientcmd"
24+
"time"
2425
)
2526

2627
// These get set at build time via -ldflags magic
@@ -76,24 +77,40 @@ func CleanupConfigAndExit() {
7677
func (kr *KubeRouter) Run() error {
7778
var err error
7879
var wg sync.WaitGroup
79-
8080
healthChan := make(chan *healthcheck.ControllerHeartbeat, 10)
8181
defer close(healthChan)
82-
8382
stopCh := make(chan struct{})
8483

84+
if !(kr.Config.RunFirewall || kr.Config.RunServiceProxy || kr.Config.RunRouter) {
85+
glog.Info("Router, Firewall or Service proxy functionality must be specified. Exiting!")
86+
os.Exit(0)
87+
}
88+
8589
hc, err := healthcheck.NewHealthController(kr.Config)
8690
if err != nil {
8791
return errors.New("Failed to create health controller: " + err.Error())
8892
}
8993
wg.Add(1)
90-
go hc.Run(healthChan, stopCh, &wg)
94+
go hc.RunServer(stopCh, &wg)
9195

92-
if !(kr.Config.RunFirewall || kr.Config.RunServiceProxy || kr.Config.RunRouter) {
93-
glog.Info("Router, Firewall or Service proxy functionality must be specified. Exiting!")
94-
os.Exit(0)
96+
informerFactory := informers.NewSharedInformerFactory(kr.Client, 0)
97+
svcInformer := informerFactory.Core().V1().Services().Informer()
98+
epInformer := informerFactory.Core().V1().Endpoints().Informer()
99+
podInformer := informerFactory.Core().V1().Pods().Informer()
100+
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
101+
nsInformer := informerFactory.Core().V1().Namespaces().Informer()
102+
npInformer := informerFactory.Networking().V1().NetworkPolicies().Informer()
103+
informerFactory.Start(stopCh)
104+
105+
err = kr.CacheSyncOrTimeout(informerFactory, stopCh)
106+
if err != nil {
107+
return errors.New("Failed to synchronize cache: " + err.Error())
95108
}
96109

110+
hc.SetAlive()
111+
wg.Add(1)
112+
go hc.RunCheck(healthChan, stopCh, &wg)
113+
97114
if (kr.Config.MetricsPort > 0) && (kr.Config.MetricsPort <= 65535) {
98115
kr.Config.MetricsEnabled = true
99116
mc, err := metrics.NewMetricsController(kr.Client, kr.Config)
@@ -110,18 +127,6 @@ func (kr *KubeRouter) Run() error {
110127
kr.Config.MetricsEnabled = false
111128
}
112129

113-
informerFactory := informers.NewSharedInformerFactory(kr.Client, 0)
114-
115-
svcInformer := informerFactory.Core().V1().Services().Informer()
116-
epInformer := informerFactory.Core().V1().Endpoints().Informer()
117-
podInformer := informerFactory.Core().V1().Pods().Informer()
118-
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
119-
nsInformer := informerFactory.Core().V1().Namespaces().Informer()
120-
npInformer := informerFactory.Networking().V1().NetworkPolicies().Informer()
121-
122-
informerFactory.Start(stopCh)
123-
informerFactory.WaitForCacheSync(stopCh)
124-
125130
if kr.Config.RunFirewall {
126131
npc, err := netpol.NewNetworkPolicyController(kr.Client,
127132
kr.Config, podInformer, npInformer, nsInformer)
@@ -177,6 +182,22 @@ func (kr *KubeRouter) Run() error {
177182
return nil
178183
}
179184

185+
// CacheSync performs cache synchronization under timeout limit
186+
func (kr *KubeRouter) CacheSyncOrTimeout(informerFactory informers.SharedInformerFactory, stopCh <-chan struct{}) error {
187+
syncOverCh := make(chan struct{})
188+
go func() {
189+
informerFactory.WaitForCacheSync(stopCh)
190+
close(syncOverCh)
191+
}()
192+
193+
select {
194+
case <-time.After(kr.Config.CacheSyncTimeout):
195+
return errors.New(kr.Config.CacheSyncTimeout.String() + " timeout")
196+
case <-syncOverCh:
197+
return nil
198+
}
199+
}
200+
180201
func PrintVersion(logOutput bool) {
181202
output := fmt.Sprintf("Running %v version %s, built on %s, %s\n", os.Args[0], version, buildDate, runtime.Version())
182203

pkg/healthcheck/health_controller.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ func (hc *HealthController) CheckHealth() bool {
107107
graceTime := time.Duration(1500 * time.Millisecond)
108108

109109
if hc.Config.RunFirewall {
110-
111110
if time.Since(hc.Status.NetworkPolicyControllerAlive) > hc.Config.IPTablesSyncPeriod+hc.Status.NetworkPolicyControllerAliveTTL+graceTime {
112111
glog.Error("Network Policy Controller heartbeat missed")
113112
health = false
@@ -138,16 +137,11 @@ func (hc *HealthController) CheckHealth() bool {
138137
return health
139138
}
140139

141-
//Run starts the HealthController
142-
func (hc *HealthController) Run(healthChan <-chan *ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) error {
143-
t := time.NewTicker(5000 * time.Millisecond)
140+
//RunServer starts the HealthController's server
141+
func (hc *HealthController) RunServer(stopCh <-chan struct{}, wg *sync.WaitGroup) error {
144142
defer wg.Done()
145-
glog.Info("Starting health controller")
146-
147143
srv := &http.Server{Addr: ":" + strconv.Itoa(int(hc.HealthPort)), Handler: http.DefaultServeMux}
148-
149144
http.HandleFunc("/healthz", hc.Handler)
150-
151145
if (hc.Config.HealthPort > 0) && (hc.Config.HealthPort <= 65535) {
152146
hc.HTTPEnabled = true
153147
go func() {
@@ -162,15 +156,26 @@ func (hc *HealthController) Run(healthChan <-chan *ControllerHeartbeat, stopCh <
162156
hc.HTTPEnabled = false
163157
}
164158

159+
select {
160+
case <-stopCh:
161+
glog.Infof("Shutting down health controller")
162+
if hc.HTTPEnabled {
163+
if err := srv.Shutdown(context.Background()); err != nil {
164+
glog.Errorf("could not shutdown: %v", err)
165+
}
166+
}
167+
return nil
168+
}
169+
}
170+
171+
//RunCheck starts the HealthController's check
172+
func (hc *HealthController) RunCheck(healthChan <-chan *ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) error {
173+
t := time.NewTicker(5000 * time.Millisecond)
174+
defer wg.Done()
165175
for {
166176
select {
167177
case <-stopCh:
168-
glog.Infof("Shutting down health controller")
169-
if hc.HTTPEnabled {
170-
if err := srv.Shutdown(context.Background()); err != nil {
171-
glog.Errorf("could not shutdown: %v", err)
172-
}
173-
}
178+
glog.Infof("Shutting down HealthController RunCheck")
174179
return nil
175180
case heartbeat := <-healthChan:
176181
hc.HandleHeartbeat(heartbeat)
@@ -179,7 +184,16 @@ func (hc *HealthController) Run(healthChan <-chan *ControllerHeartbeat, stopCh <
179184
}
180185
hc.Status.Healthy = hc.CheckHealth()
181186
}
187+
}
188+
189+
func (hc *HealthController) SetAlive() {
190+
191+
now := time.Now()
182192

193+
hc.Status.MetricsControllerAlive = now
194+
hc.Status.NetworkPolicyControllerAlive = now
195+
hc.Status.NetworkRoutingControllerAlive = now
196+
hc.Status.NetworkServicesControllerAlive = now
183197
}
184198

185199
//NewHealthController creates a new health controller and returns a reference to it
@@ -188,11 +202,7 @@ func NewHealthController(config *options.KubeRouterConfig) (*HealthController, e
188202
Config: config,
189203
HealthPort: config.HealthPort,
190204
Status: HealthStats{
191-
Healthy: true,
192-
MetricsControllerAlive: time.Now(),
193-
NetworkPolicyControllerAlive: time.Now(),
194-
NetworkRoutingControllerAlive: time.Now(),
195-
NetworkServicesControllerAlive: time.Now(),
205+
Healthy: true,
196206
},
197207
}
198208
return &hc, nil

pkg/options/options.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type KubeRouterConfig struct {
1313
AdvertiseNodePodCidr bool
1414
AdvertiseLoadBalancerIp bool
1515
BGPGracefulRestart bool
16+
CacheSyncTimeout time.Duration
1617
CleanupConfig bool
1718
ClusterAsn uint
1819
ClusterCIDR string
@@ -50,6 +51,7 @@ type KubeRouterConfig struct {
5051

5152
func NewKubeRouterConfig() *KubeRouterConfig {
5253
return &KubeRouterConfig{
54+
CacheSyncTimeout: 1 * time.Minute,
5355
IpvsSyncPeriod: 5 * time.Minute,
5456
IPTablesSyncPeriod: 5 * time.Minute,
5557
RoutesSyncPeriod: 5 * time.Minute,
@@ -62,6 +64,8 @@ func (s *KubeRouterConfig) AddFlags(fs *pflag.FlagSet) {
6264
"Print usage information.")
6365
fs.BoolVarP(&s.Version, "version", "V", false,
6466
"Print version information.")
67+
fs.DurationVar(&s.CacheSyncTimeout, "cache-sync-timeout", s.CacheSyncTimeout,
68+
"The timeout for cache synchronization (e.g. '5s', '1m'). Must be greater than 0.")
6569
fs.BoolVar(&s.RunServiceProxy, "run-service-proxy", true,
6670
"Enables Service Proxy -- sets up IPVS for Kubernetes Services.")
6771
fs.BoolVar(&s.RunFirewall, "run-firewall", true,

0 commit comments

Comments
 (0)