Skip to content

Commit 85d5340

Browse files
authored
NETOBSERV-2077: switch to syncMap to handle concurrency issues (#537)
Signed-off-by: Mohamed Mahmoud <[email protected]> (cherry picked from commit 6feda50)
1 parent c2d377a commit 85d5340

File tree

1 file changed

+46
-28
lines changed

1 file changed

+46
-28
lines changed

pkg/ifaces/watcher.go

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ const (
2020
netnsVolume = "/var/run/netns"
2121
)
2222

23+
var log = logrus.WithField("component", "ifaces.Watcher")
24+
2325
// Watcher uses system's netlink to get real-time information events about network interfaces'
2426
// addition or removal.
2527
type Watcher struct {
@@ -31,7 +33,7 @@ type Watcher struct {
3133
linkSubscriberAt func(ns netns.NsHandle, ch chan<- netlink.LinkUpdate, done <-chan struct{}) error
3234
mutex *sync.Mutex
3335
netnsWatcher *fsnotify.Watcher
34-
nsDone map[string]chan struct{}
36+
nsDone sync.Map
3537
}
3638

3739
func NewWatcher(bufLen int) *Watcher {
@@ -42,19 +44,19 @@ func NewWatcher(bufLen int) *Watcher {
4244
linkSubscriberAt: netlink.LinkSubscribeAt,
4345
mutex: &sync.Mutex{},
4446
netnsWatcher: &fsnotify.Watcher{},
45-
nsDone: make(map[string]chan struct{}),
47+
nsDone: sync.Map{},
4648
}
4749
}
4850

4951
func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) {
5052
out := make(chan Event, w.bufLen)
5153
netns, err := getNetNS()
5254
if err != nil {
53-
w.nsDone[""] = make(chan struct{})
55+
w.nsDone.Store("", make(chan struct{}))
5456
go w.sendUpdates(ctx, "", out)
5557
} else {
5658
for _, n := range netns {
57-
w.nsDone[n] = make(chan struct{})
59+
w.nsDone.Store(n, make(chan struct{}))
5860
go w.sendUpdates(ctx, n, out)
5961
}
6062
}
@@ -66,11 +68,13 @@ func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) {
6668
func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
6769
var netnsHandle netns.NsHandle
6870
var err error
69-
log := logrus.WithField("component", "ifaces.Watcher")
70-
doneChan := w.nsDone[ns]
71+
ch, ok := w.nsDone.Load(ns)
72+
if !ok {
73+
log.WithError(err).Warnf("netns %s not found in netns map", ns)
74+
return
75+
}
76+
doneChan := ch.(chan struct{})
7177
defer func() {
72-
close(doneChan)
73-
delete(w.nsDone, ns)
7478
if netnsHandle.IsOpen() {
7579
netnsHandle.Close()
7680
}
@@ -181,9 +185,41 @@ func getNetNS() ([]string, error) {
181185
return netns, nil
182186
}
183187

188+
func (w *Watcher) handleEvent(ctx context.Context, event fsnotify.Event, out chan Event) {
189+
ns := filepath.Base(event.Name)
190+
191+
switch {
192+
case event.Op&fsnotify.Create == fsnotify.Create:
193+
log.WithField("netns", ns).Debug("netns create notification")
194+
w.createNamespace(ctx, ns, out)
195+
case event.Op&fsnotify.Remove == fsnotify.Remove:
196+
log.WithField("netns", ns).Debug("netns delete notification")
197+
w.deleteNamespace(ns)
198+
}
199+
}
200+
201+
func (w *Watcher) createNamespace(ctx context.Context, ns string, out chan Event) {
202+
if ch, ok := w.nsDone.Load(ns); ok {
203+
log.WithField("netns", ns).Debug("netns channel already exists, deleting it")
204+
close(ch.(chan struct{}))
205+
w.nsDone.Delete(ns)
206+
}
207+
208+
w.nsDone.Store(ns, make(chan struct{}))
209+
go w.sendUpdates(ctx, ns, out)
210+
}
211+
212+
func (w *Watcher) deleteNamespace(ns string) {
213+
if ch, ok := w.nsDone.Load(ns); ok {
214+
close(ch.(chan struct{}))
215+
w.nsDone.Delete(ns)
216+
} else {
217+
log.WithField("netns", ns).Debug("netns delete but no channel exists")
218+
}
219+
}
220+
184221
func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) {
185222
var err error
186-
log := logrus.WithField("component", "ifaces.Watcher")
187223

188224
w.netnsWatcher, err = fsnotify.NewWatcher()
189225
if err != nil {
@@ -198,25 +234,7 @@ func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) {
198234
if !ok {
199235
return
200236
}
201-
if event.Op&fsnotify.Create == fsnotify.Create {
202-
ns := filepath.Base(event.Name)
203-
log.WithField("netns", ns).Debug("netns create notification")
204-
if _, ok := w.nsDone[ns]; ok {
205-
log.WithField("netns", ns).Debug("netns channel already exists, delete it")
206-
delete(w.nsDone, ns)
207-
}
208-
w.nsDone[ns] = make(chan struct{})
209-
go w.sendUpdates(ctx, ns, out)
210-
}
211-
if event.Op&fsnotify.Remove == fsnotify.Remove {
212-
ns := filepath.Base(event.Name)
213-
log.WithField("netns", ns).Debug("netns delete notification")
214-
if _, ok := w.nsDone[ns]; ok {
215-
w.nsDone[ns] <- struct{}{}
216-
} else {
217-
log.WithField("netns", ns).Debug("netns delete but there is no channel to send events to")
218-
}
219-
}
237+
w.handleEvent(ctx, event, out)
220238
case err, ok := <-w.netnsWatcher.Errors:
221239
if !ok {
222240
return

0 commit comments

Comments
 (0)