Skip to content

Commit fa8d69e

Browse files
committed
fix: add locking around ipset invocations
1 parent e8a81f3 commit fa8d69e

File tree

6 files changed

+61
-11
lines changed

6 files changed

+61
-11
lines changed

pkg/cmd/kube-router.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func CleanupConfigAndExit() {
7272
// Run starts the controllers and waits forever till we get SIGINT or SIGTERM
7373
func (kr *KubeRouter) Run() error {
7474
var err error
75+
var ipsetMutex sync.Mutex
7576
var wg sync.WaitGroup
7677
healthChan := make(chan *healthcheck.ControllerHeartbeat, 10)
7778
defer close(healthChan)
@@ -140,7 +141,8 @@ func (kr *KubeRouter) Run() error {
140141
}
141142

142143
if kr.Config.RunRouter {
143-
nrc, err := routing.NewNetworkRoutingController(kr.Client, kr.Config, nodeInformer, svcInformer, epInformer)
144+
nrc, err := routing.NewNetworkRoutingController(kr.Client, kr.Config,
145+
nodeInformer, svcInformer, epInformer, &ipsetMutex)
144146
if err != nil {
145147
return errors.New("Failed to create network routing controller: " + err.Error())
146148
}
@@ -162,7 +164,7 @@ func (kr *KubeRouter) Run() error {
162164

163165
if kr.Config.RunServiceProxy {
164166
nsc, err := proxy.NewNetworkServicesController(kr.Client, kr.Config,
165-
svcInformer, epInformer, podInformer)
167+
svcInformer, epInformer, podInformer, &ipsetMutex)
166168
if err != nil {
167169
return errors.New("Failed to create network services controller: " + err.Error())
168170
}
@@ -183,7 +185,7 @@ func (kr *KubeRouter) Run() error {
183185

184186
if kr.Config.RunFirewall {
185187
npc, err := netpol.NewNetworkPolicyController(kr.Client,
186-
kr.Config, podInformer, npInformer, nsInformer)
188+
kr.Config, podInformer, npInformer, nsInformer, &ipsetMutex)
187189
if err != nil {
188190
return errors.New("Failed to create network policy controller: " + err.Error())
189191
}

pkg/controllers/netpol/network_policy_controller.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type NetworkPolicyController struct {
5757
MetricsEnabled bool
5858
healthChan chan<- *healthcheck.ControllerHeartbeat
5959
fullSyncRequestChan chan struct{}
60+
ipsetMutex *sync.Mutex
6061

6162
ipSetHandler *utils.IPSet
6263

@@ -567,6 +568,13 @@ func (npc *NetworkPolicyController) Cleanup() {
567568
}
568569

569570
// delete all ipsets
571+
klog.V(1).Infof("Attempting to attain ipset mutex lock")
572+
npc.ipsetMutex.Lock()
573+
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
574+
defer func() {
575+
npc.ipsetMutex.Unlock()
576+
klog.V(1).Infof("Returned ipset mutex lock")
577+
}()
570578
ipset, err := utils.NewIPSet(false)
571579
if err != nil {
572580
klog.Errorf("Failed to clean up ipsets: " + err.Error())
@@ -586,8 +594,8 @@ func (npc *NetworkPolicyController) Cleanup() {
586594
// NewNetworkPolicyController returns new NetworkPolicyController object
587595
func NewNetworkPolicyController(clientset kubernetes.Interface,
588596
config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer,
589-
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) {
590-
npc := NetworkPolicyController{}
597+
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkPolicyController, error) {
598+
npc := NetworkPolicyController{ipsetMutex: ipsetMutex}
591599

592600
// Creating a single-item buffered channel to ensure that we only keep a single full sync request at a time,
593601
// additional requests would be pointless to queue since after the first one was processed the system would already

pkg/controllers/netpol/network_policy_controller_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"net"
66
"strings"
7+
"sync"
78
"testing"
89
"time"
910

@@ -522,7 +523,7 @@ func TestNetworkPolicyController(t *testing.T) {
522523
_, podInformer, nsInformer, netpolInformer := newFakeInformersFromClient(client)
523524
for _, test := range testCases {
524525
t.Run(test.name, func(t *testing.T) {
525-
_, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer)
526+
_, err := NewNetworkPolicyController(client, test.config, podInformer, netpolInformer, nsInformer, &sync.Mutex{})
526527
if err == nil && test.expectError {
527528
t.Error("This config should have failed, but it was successful instead")
528529
} else if err != nil {

pkg/controllers/netpol/policy.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,14 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(networkPoliciesInfo
7676
klog.V(2).Infof("Syncing network policy chains took %v", endTime)
7777
}()
7878

79+
klog.V(1).Infof("Attempting to attain ipset mutex lock")
80+
npc.ipsetMutex.Lock()
81+
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
82+
defer func() {
83+
npc.ipsetMutex.Unlock()
84+
klog.V(1).Infof("Returned ipset mutex lock")
85+
}()
86+
7987
ipset, err := utils.NewIPSet(false)
8088
if err != nil {
8189
return nil, nil, err

pkg/controllers/proxy/network_services_controller.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"syscall"
1818
"time"
1919

20+
"golang.org/x/net/context"
21+
2022
"github.com/cloudnativelabs/kube-router/pkg/cri"
2123
"github.com/cloudnativelabs/kube-router/pkg/healthcheck"
2224
"github.com/cloudnativelabs/kube-router/pkg/metrics"
@@ -28,7 +30,6 @@ import (
2830
"github.com/prometheus/client_golang/prometheus"
2931
"github.com/vishvananda/netlink"
3032
"github.com/vishvananda/netns"
31-
"golang.org/x/net/context"
3233
api "k8s.io/api/core/v1"
3334
"k8s.io/client-go/kubernetes"
3435
"k8s.io/client-go/tools/cache"
@@ -220,6 +221,7 @@ type NetworkServicesController struct {
220221
ln LinuxNetworking
221222
readyForUpdates bool
222223
ProxyFirewallSetup *sync.Cond
224+
ipsetMutex *sync.Mutex
223225

224226
// Map of ipsets that we use.
225227
ipsetMap map[string]*utils.Set
@@ -648,6 +650,13 @@ func (nsc *NetworkServicesController) cleanupIpvsFirewall() {
648650
}
649651

650652
// Clear ipsets.
653+
klog.V(1).Infof("Attempting to attain ipset mutex lock")
654+
nsc.ipsetMutex.Lock()
655+
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
656+
defer func() {
657+
nsc.ipsetMutex.Unlock()
658+
klog.V(1).Infof("Returned ipset mutex lock")
659+
}()
651660
ipSetHandler, err := utils.NewIPSet(false)
652661
if err != nil {
653662
klog.Errorf("Failed to initialize ipset handler: %s", err.Error())
@@ -674,6 +683,13 @@ func (nsc *NetworkServicesController) syncIpvsFirewall() error {
674683
- update ipsets based on currently active IPVS services
675684
*/
676685
var err error
686+
klog.V(1).Infof("Attempting to attain ipset mutex lock")
687+
nsc.ipsetMutex.Lock()
688+
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
689+
defer func() {
690+
nsc.ipsetMutex.Unlock()
691+
klog.V(1).Infof("Returned ipset mutex lock")
692+
}()
677693

678694
localIPsIPSet := nsc.ipsetMap[localIPsIPSetName]
679695

@@ -2462,15 +2478,15 @@ func (nsc *NetworkServicesController) handleServiceDelete(obj interface{}) {
24622478
// NewNetworkServicesController returns NetworkServicesController object
24632479
func NewNetworkServicesController(clientset kubernetes.Interface,
24642480
config *options.KubeRouterConfig, svcInformer cache.SharedIndexInformer,
2465-
epInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer) (*NetworkServicesController, error) {
2481+
epInformer cache.SharedIndexInformer, podInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkServicesController, error) {
24662482

24672483
var err error
24682484
ln, err := newLinuxNetworking()
24692485
if err != nil {
24702486
return nil, err
24712487
}
24722488

2473-
nsc := NetworkServicesController{ln: ln}
2489+
nsc := NetworkServicesController{ln: ln, ipsetMutex: ipsetMutex}
24742490

24752491
if config.MetricsEnabled {
24762492
//Register the metrics for this controller

pkg/controllers/routing/network_routes_controller.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ type NetworkRoutingController struct {
118118
overrideNextHop bool
119119
podCidr string
120120
CNIFirewallSetup *sync.Cond
121+
ipsetMutex *sync.Mutex
121122

122123
nodeLister cache.Indexer
123124
svcLister cache.Indexer
@@ -645,6 +646,13 @@ func (nrc *NetworkRoutingController) Cleanup() {
645646
}
646647

647648
// delete all ipsets created by kube-router
649+
klog.V(1).Infof("Attempting to attain ipset mutex lock")
650+
nrc.ipsetMutex.Lock()
651+
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
652+
defer func() {
653+
nrc.ipsetMutex.Unlock()
654+
klog.V(1).Infof("Returned ipset mutex lock")
655+
}()
648656
ipset, err := utils.NewIPSet(nrc.isIpv6)
649657
if err != nil {
650658
klog.Errorf("Failed to clean up ipsets: " + err.Error())
@@ -668,6 +676,13 @@ func (nrc *NetworkRoutingController) syncNodeIPSets() error {
668676
metrics.ControllerRoutesSyncTime.Observe(time.Since(start).Seconds())
669677
}
670678
}()
679+
klog.V(1).Infof("Attempting to attain ipset mutex lock")
680+
nrc.ipsetMutex.Lock()
681+
klog.V(1).Infof("Attained ipset mutex lock, continuing...")
682+
defer func() {
683+
nrc.ipsetMutex.Unlock()
684+
klog.V(1).Infof("Returned ipset mutex lock")
685+
}()
671686

672687
nodes := nrc.nodeLister.List()
673688

@@ -1019,11 +1034,11 @@ func (nrc *NetworkRoutingController) startBgpServer(grpcServer bool) error {
10191034
func NewNetworkRoutingController(clientset kubernetes.Interface,
10201035
kubeRouterConfig *options.KubeRouterConfig,
10211036
nodeInformer cache.SharedIndexInformer, svcInformer cache.SharedIndexInformer,
1022-
epInformer cache.SharedIndexInformer) (*NetworkRoutingController, error) {
1037+
epInformer cache.SharedIndexInformer, ipsetMutex *sync.Mutex) (*NetworkRoutingController, error) {
10231038

10241039
var err error
10251040

1026-
nrc := NetworkRoutingController{}
1041+
nrc := NetworkRoutingController{ipsetMutex: ipsetMutex}
10271042
if kubeRouterConfig.MetricsEnabled {
10281043
//Register the metrics for this controller
10291044
prometheus.MustRegister(metrics.ControllerBGPadvertisementsReceived)

0 commit comments

Comments
 (0)