Skip to content

Commit b48e89b

Browse files
Merge pull request #60 from daemon1024/fix-kubearmor-reconn
fix(server): keep retrying connections till node/pod is deleted
2 parents 90cf7ba + 68c68b5 commit b48e89b

File tree

3 files changed

+100
-82
lines changed

3 files changed

+100
-82
lines changed

relay-server/elasticsearch/adapter.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,17 +197,17 @@ func (ecl *ElasticsearchClient) PrintBulkStats() {
197197
if biStats.NumFailed > 0 {
198198
fmt.Printf(
199199
"Indexed [%s] documents with [%s] errors in %s (%s docs/sec)",
200-
humanize.Comma(int64(biStats.NumFlushed)),
201-
humanize.Comma(int64(biStats.NumFailed)),
200+
humanize.Commaf(float64(biStats.NumFlushed)),
201+
humanize.Commaf(float64(biStats.NumFailed)),
202202
dur.Truncate(time.Millisecond),
203-
humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
203+
humanize.Commaf(float64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
204204
)
205205
} else {
206206
log.Printf(
207207
"Sucessfuly indexed [%s] documents in %s (%s docs/sec)",
208-
humanize.Comma(int64(biStats.NumFlushed)),
208+
humanize.Commaf(float64(biStats.NumFlushed)),
209209
dur.Truncate(time.Millisecond),
210-
humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
210+
humanize.Commaf(float64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
211211
)
212212
}
213213
println(strings.Repeat("▔", 80))

relay-server/server/k8sHandler.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -259,18 +259,38 @@ func (kh *K8sHandler) getKaPodInformer(ipsChan chan string) cache.SharedIndexInf
259259
_, _ = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
260260
AddFunc: func(obj interface{}) {
261261
pod, ok := obj.(*corev1.Pod)
262-
if ok {
263-
if pod.Status.PodIP != "" {
264-
ipsChan <- pod.Status.PodIP
265-
}
262+
if !ok {
263+
return
264+
}
265+
266+
if pod.Status.PodIP != "" {
267+
ipsChan <- pod.Status.PodIP
266268
}
267269
},
268270
UpdateFunc: func(old, new interface{}) {
271+
oldPod, ok := old.(*corev1.Pod)
272+
if !ok {
273+
return
274+
}
275+
269276
newPod, ok := new.(*corev1.Pod)
270-
if ok {
271-
if newPod.Status.PodIP != "" {
272-
ipsChan <- newPod.Status.PodIP
273-
}
277+
if !ok {
278+
return
279+
}
280+
281+
if newPod.Status.PodIP != "" && newPod.Status.PodIP != oldPod.Status.PodIP {
282+
ipsChan <- newPod.Status.PodIP
283+
DeleteClientEntry(oldPod.Status.PodIP)
284+
}
285+
},
286+
DeleteFunc: func(obj interface{}) {
287+
pod, ok := obj.(*corev1.Pod)
288+
if !ok {
289+
return
290+
}
291+
292+
if pod.Status.PodIP != "" {
293+
DeleteClientEntry(pod.Status.PodIP)
274294
}
275295
},
276296
})

relay-server/server/relayServer.go

Lines changed: 67 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,12 @@ var Running bool
3838
var ClientList map[string]int
3939

4040
// ClientListLock Lock
41-
var ClientListLock *sync.Mutex
41+
var ClientListLock *sync.RWMutex
4242

4343
func init() {
4444
Running = true
4545
ClientList = map[string]int{}
46-
ClientListLock = &sync.Mutex{}
47-
46+
ClientListLock = &sync.RWMutex{}
4847
}
4948

5049
// ========== //
@@ -706,11 +705,7 @@ func DeleteClientEntry(nodeIP string) {
706705
ClientListLock.Lock()
707706
defer ClientListLock.Unlock()
708707

709-
_, exists := ClientList[nodeIP]
710-
711-
if exists {
712-
delete(ClientList, nodeIP)
713-
}
708+
delete(ClientList, nodeIP)
714709
}
715710

716711
// =============== //
@@ -722,66 +717,72 @@ func connectToKubeArmor(nodeIP, port string) error {
722717
// create connection info
723718
server := nodeIP + ":" + port
724719

725-
defer DeleteClientEntry(nodeIP)
720+
for Running {
721+
ClientListLock.RLock()
722+
_, found := ClientList[nodeIP]
723+
ClientListLock.RUnlock()
724+
if !found {
725+
// KubeArmor with this IP is deleted or the IP has changed
726+
// parent function will spawn a new goroutine accordingly
727+
break
728+
}
726729

727-
// create a client
728-
client := NewClient(server)
729-
if client == nil {
730-
return nil
731-
}
730+
// create a client
731+
client := NewClient(server)
732+
if client == nil {
733+
time.Sleep(5 * time.Second) // wait for 5 second before retrying
734+
continue
735+
}
732736

733-
// do healthcheck
734-
if ok := client.DoHealthCheck(); !ok {
735-
kg.Warnf("Failed to check the liveness of KubeArmor's gRPC service (%s)", server)
736-
return nil
737-
}
738-
kg.Printf("Checked the liveness of KubeArmor's gRPC service (%s)", server)
737+
// do healthcheck
738+
if ok := client.DoHealthCheck(); !ok {
739+
kg.Warnf("Failed to check the liveness of KubeArmor's gRPC service (%s)", server)
740+
time.Sleep(5 * time.Second) // wait for 5 second before retrying
741+
continue
742+
}
743+
kg.Printf("Checked the liveness of KubeArmor's gRPC service (%s)", server)
739744

740-
var wg sync.WaitGroup
741-
stop := make(chan struct{})
742-
errCh := make(chan error, 1)
745+
var wg sync.WaitGroup
746+
stop := make(chan struct{})
747+
errCh := make(chan error, 1)
743748

744-
// Start watching messages
745-
wg.Add(1)
746-
go func() {
747-
client.WatchMessages(&wg, stop, errCh)
748-
}()
749-
kg.Print("Started to watch messages from " + server)
749+
// Start watching messages
750+
wg.Add(1)
751+
go client.WatchMessages(&wg, stop, errCh)
752+
kg.Print("Started to watch messages from " + server)
750753

751-
// Start watching alerts
752-
wg.Add(1)
753-
go func() {
754-
client.WatchAlerts(&wg, stop, errCh)
755-
}()
756-
kg.Print("Started to watch alerts from " + server)
754+
// Start watching alerts
755+
wg.Add(1)
756+
go client.WatchAlerts(&wg, stop, errCh)
757+
kg.Print("Started to watch alerts from " + server)
757758

758-
// Start watching logs
759-
wg.Add(1)
760-
go func() {
761-
client.WatchLogs(&wg, stop, errCh)
762-
}()
763-
kg.Print("Started to watch logs from " + server)
764-
765-
// Wait for an error or all goroutines to finish
766-
select {
767-
case err := <-errCh:
768-
close(stop) // Stop other goroutines
769-
kg.Warn(err.Error())
770-
case <-func() chan struct{} {
771-
done := make(chan struct{})
772-
go func() {
773-
wg.Wait()
774-
close(done)
775-
}()
776-
return done
777-
}():
778-
// All goroutines finished without error
779-
}
759+
// Start watching logs
760+
wg.Add(1)
761+
go client.WatchLogs(&wg, stop, errCh)
762+
kg.Print("Started to watch logs from " + server)
780763

781-
if err := client.DestroyClient(); err != nil {
782-
kg.Warnf("Failed to destroy the client (%s) %s", server, err.Error())
764+
// Wait for an error or all goroutines to finish
765+
select {
766+
case err := <-errCh:
767+
close(stop) // Stop other goroutines
768+
kg.Warn(err.Error())
769+
case <-func() chan struct{} {
770+
done := make(chan struct{})
771+
go func() {
772+
wg.Wait()
773+
close(done)
774+
}()
775+
return done
776+
}():
777+
// All goroutines finished without error
778+
}
779+
780+
if err := client.DestroyClient(); err != nil {
781+
kg.Warnf("Failed to destroy the client (%s) %s", server, err.Error())
782+
}
783+
784+
kg.Printf("Destroyed the client (%s)", server)
783785
}
784-
kg.Printf("Destroyed the client (%s)", server)
785786

786787
return nil
787788
}
@@ -810,16 +811,13 @@ func (rs *RelayServer) GetFeedsFromNodes() {
810811
}
811812

812813
for Running {
813-
select {
814-
case ip := <-ipsChan:
815-
ClientListLock.Lock()
816-
if _, ok := ClientList[ip]; !ok {
817-
ClientList[ip] = 1
818-
go connectToKubeArmor(ip, rs.Port)
819-
}
820-
ClientListLock.Unlock()
814+
ip := <-ipsChan
815+
ClientListLock.Lock()
816+
if _, ok := ClientList[ip]; !ok {
817+
ClientList[ip] = 1
818+
go connectToKubeArmor(ip, rs.Port)
821819
}
822-
time.Sleep(10 * time.Second)
820+
ClientListLock.Unlock()
823821
}
824822
}
825823
}

0 commit comments

Comments
 (0)