Skip to content

Commit 4c81c29

Browse files
authored
Merge pull request #230 from chengxiangdong/feat_class
Fix thread pool issue
2 parents 4658721 + 0b7f916 commit 4c81c29

File tree

2 files changed

+94
-50
lines changed

2 files changed

+94
-50
lines changed

pkg/cloudprovider/huaweicloud/huaweicloud.go

Lines changed: 30 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -640,22 +640,24 @@ func IsPodActive(p v1.Pod) bool {
640640

641641
type LoadBalancerServiceListener struct {
642642
Basic
643-
stopChannel chan struct{}
644643
kubeClient *corev1.CoreV1Client
644+
stopChannel chan struct{}
645645

646+
goroutinePool *common.ExecutePool
646647
serviceCache map[string]*v1.Service
647648
invalidServiceCache *gocache.Cache
648649
}
649650

650651
func (e *LoadBalancerServiceListener) stopListenerSlice() {
651652
klog.Warningf("Stop listening to Endpoints")
653+
e.stopChannel <- struct{}{}
652654
close(e.stopChannel)
653655
}
654656

655-
var queue = make(chan v1.Service, 3)
656-
657657
func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Service, bool)) {
658658
klog.Infof("starting EndpointListener")
659+
e.goroutinePool.Start()
660+
659661
for {
660662
endpointsList, err := e.kubeClient.Endpoints(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{Limit: 1})
661663

@@ -693,17 +695,11 @@ func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Serv
693695
return
694696
}
695697
klog.V(6).Infof("New Endpoints added, namespace: %s, name: %s", newEndpoint.Namespace, newEndpoint.Name)
696-
queue <- v1.Service{
697-
ObjectMeta: metav1.ObjectMeta{
698-
Namespace: newEndpoint.Namespace,
699-
Name: newEndpoint.Name,
700-
},
701-
}
702-
go func() {
703-
s := <-queue
704-
klog.V(6).Infof("process endpoints: %s / %s", s.Namespace, s.Name)
705-
e.dispatcher(s.Namespace, s.Name, endpointAdded, handle)
706-
}()
698+
699+
e.goroutinePool.Submit(func() {
700+
klog.V(6).Infof("process endpoints: %s / %s", newEndpoint.Namespace, newEndpoint.Name)
701+
e.dispatcher(newEndpoint.Namespace, newEndpoint.Name, endpointAdded, handle)
702+
})
707703
},
708704
UpdateFunc: func(oldObj, newObj interface{}) {
709705
newEndpoint := newObj.(*v1.Endpoints)
@@ -713,17 +709,10 @@ func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Serv
713709
}
714710
klog.V(6).Infof("Endpoint update, namespace: %s, name: %s", newEndpoint.Namespace, newEndpoint.Name)
715711

716-
queue <- v1.Service{
717-
ObjectMeta: metav1.ObjectMeta{
718-
Namespace: newEndpoint.Namespace,
719-
Name: newEndpoint.Name,
720-
},
721-
}
722-
go func() {
723-
s := <-queue
724-
klog.V(6).Infof("process endpoints: %s / %s", s.Namespace, s.Name)
725-
e.dispatcher(s.Namespace, s.Name, endpointUpdate, handle)
726-
}()
712+
e.goroutinePool.Submit(func() {
713+
klog.V(6).Infof("process endpoints: %s / %s", newEndpoint.Namespace, newEndpoint.Name)
714+
e.dispatcher(newEndpoint.Namespace, newEndpoint.Name, endpointUpdate, handle)
715+
})
727716
},
728717
DeleteFunc: func(obj interface{}) {},
729718
}, 5*time.Second)
@@ -762,17 +751,11 @@ func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Serv
762751
}
763752

764753
klog.Infof("Found service was updated, namespace: %s, name: %s", svs.Namespace, svs.Name)
765-
queue <- v1.Service{
766-
ObjectMeta: metav1.ObjectMeta{
767-
Namespace: svs.Namespace,
768-
Name: svs.Name,
769-
},
770-
}
771-
go func() {
772-
s := <-queue
773-
klog.V(4).Infof("process endpoints: %s / %s", s.Namespace, s.Name)
754+
755+
e.goroutinePool.Submit(func() {
756+
klog.V(4).Infof("process endpoints: %s / %s", svs.Namespace, svs.Name)
774757
handle(svs, false)
775-
}()
758+
})
776759
},
777760
DeleteFunc: func(obj interface{}) {
778761
svs, _ := obj.(*v1.Service)
@@ -781,17 +764,10 @@ func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Serv
781764
}
782765

783766
klog.Infof("Found service was deleted, namespace: %s, name: %s", svs.Namespace, svs.Name)
784-
queue <- v1.Service{
785-
ObjectMeta: metav1.ObjectMeta{
786-
Namespace: svs.Namespace,
787-
Name: svs.Name,
788-
},
789-
}
790-
go func() {
791-
s := <-queue
792-
klog.V(4).Infof("process endpoints: %s / %s", s.Namespace, s.Name)
767+
e.goroutinePool.Submit(func() {
768+
klog.V(4).Infof("process endpoints: %s / %s", svs.Namespace, svs.Name)
793769
handle(svs, true)
794-
}()
770+
})
795771
},
796772
}, 5*time.Second)
797773
if err != nil {
@@ -808,15 +784,16 @@ func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Serv
808784

809785
func (e *LoadBalancerServiceListener) dispatcher(namespace, name, eType string, handle func(*v1.Service, bool)) {
810786
key := fmt.Sprintf("%s/%s", namespace, name)
811-
if _, ok := e.invalidServiceCache.Get(key); ok {
787+
if v, ok := e.invalidServiceCache.Get(key); ok {
788+
klog.V(6).Infof("Service %s/%s not found, will not try again within 10 minutes: %s", namespace, name, v)
812789
return
813790
}
814791

815792
svc, err := e.kubeClient.Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
816793
if err != nil {
817794
klog.Errorf("failed to query service, error: %s", err)
818795
if strings.Contains(err.Error(), "not found") {
819-
e.invalidServiceCache.Set(key, "", gocache.DefaultExpiration)
796+
e.invalidServiceCache.Set(key, err.Error(), 10*time.Minute)
820797
}
821798
return
822799
}
@@ -902,10 +879,12 @@ func (e *LoadBalancerServiceListener) autoRemoveHealthCheckRule(handle func(node
902879

903880
func (h *CloudProvider) listenerDeploy() error {
904881
listener := LoadBalancerServiceListener{
905-
Basic: h.Basic,
882+
Basic: h.Basic,
883+
kubeClient: h.kubeClient,
884+
stopChannel: make(chan struct{}, 1),
906885

907-
kubeClient: h.kubeClient,
908-
serviceCache: make(map[string]*v1.Service, 0),
886+
goroutinePool: common.NewExecutePool(5),
887+
serviceCache: make(map[string]*v1.Service),
909888
invalidServiceCache: gocache.New(5*time.Minute, 10*time.Minute),
910889
}
911890

@@ -979,6 +958,7 @@ func (h *CloudProvider) listenerDeploy() error {
979958
})
980959
}, func() {
981960
listener.stopListenerSlice()
961+
listener.goroutinePool.Stop()
982962
})
983963
return nil
984964
}

pkg/common/common.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ limitations under the License.
1717
package common
1818

1919
import (
20+
"k8s.io/klog/v2"
21+
"os"
22+
"os/signal"
23+
"syscall"
2024
"time"
2125

2226
"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/sdkerr"
@@ -53,3 +57,63 @@ func WaitForCompleted(condition wait.ConditionFunc) error {
5357
}
5458
return wait.ExponentialBackoff(backoff, condition)
5559
}
60+
61+
type JobHandle func()
62+
type ExecutePool struct {
63+
workerNum int
64+
queueCh chan JobHandle
65+
stopCh chan struct{}
66+
}
67+
68+
func NewExecutePool(size int) *ExecutePool {
69+
return &ExecutePool{
70+
workerNum: size,
71+
queueCh: make(chan JobHandle, 2000),
72+
}
73+
}
74+
75+
func (w *ExecutePool) Start() {
76+
// Make sure it is not started repeatedly.
77+
if w.stopCh != nil {
78+
w.stopCh <- struct{}{}
79+
close(w.stopCh)
80+
}
81+
stopCh := make(chan struct{}, 1)
82+
w.stopCh = stopCh
83+
84+
for i := 0; i < w.workerNum; i++ {
85+
klog.Infof("start goroutine pool handler: %v/%v", i, w.workerNum)
86+
go func() {
87+
for {
88+
select {
89+
case handler, ok := <-w.queueCh:
90+
if !ok {
91+
klog.Errorf("goroutine pool exiting")
92+
return
93+
}
94+
handler()
95+
case <-stopCh:
96+
klog.Info("goroutine pool stopping")
97+
return
98+
}
99+
}
100+
}()
101+
}
102+
103+
go func() {
104+
exit := make(chan os.Signal, 1)
105+
signal.Notify(exit, os.Interrupt, syscall.SIGTERM)
106+
<-exit
107+
w.Stop()
108+
}()
109+
}
110+
111+
func (w *ExecutePool) Stop() {
112+
w.stopCh <- struct{}{}
113+
close(w.stopCh)
114+
close(w.queueCh)
115+
}
116+
117+
func (w *ExecutePool) Submit(work JobHandle) {
118+
w.queueCh <- work
119+
}

0 commit comments

Comments
 (0)