Skip to content

Commit 58e5d37

Browse files
authored
NETOBSERV-1697: Add retry around netlinkSubscribeAt (#358)
Signed-off-by: Mohamed Mahmoud <[email protected]>
1 parent fdebe3f commit 58e5d37

File tree

7 files changed

+128
-64
lines changed

7 files changed

+128
-64
lines changed

pkg/ebpf/tracer.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,16 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
208208
func (m *FlowFetcher) AttachTCX(iface ifaces.Interface) error {
209209
ilog := log.WithField("iface", iface)
210210
if iface.NetNS != netns.None() {
211+
originalNs, err := netns.Get()
212+
if err != nil {
213+
return fmt.Errorf("failed to get current netns: %w", err)
214+
}
215+
defer func() {
216+
if err := netns.Set(originalNs); err != nil {
217+
ilog.WithError(err).Error("failed to set netns back")
218+
}
219+
originalNs.Close()
220+
}()
211221
if err := unix.Setns(int(iface.NetNS), unix.CLONE_NEWNET); err != nil {
212222
return fmt.Errorf("failed to setns to %s: %w", iface.NetNS, err)
213223
}
@@ -823,6 +833,16 @@ func (p *PacketFetcher) Register(iface ifaces.Interface) error {
823833
func (p *PacketFetcher) AttachTCX(iface ifaces.Interface) error {
824834
ilog := log.WithField("iface", iface)
825835
if iface.NetNS != netns.None() {
836+
originalNs, err := netns.Get()
837+
if err != nil {
838+
return fmt.Errorf("PCA failed to get current netns: %w", err)
839+
}
840+
defer func() {
841+
if err := netns.Set(originalNs); err != nil {
842+
ilog.WithError(err).Error("PCA failed to set netns back")
843+
}
844+
originalNs.Close()
845+
}()
826846
if err := unix.Setns(int(iface.NetNS), unix.CLONE_NEWNET); err != nil {
827847
return fmt.Errorf("PCA failed to setns to %s: %w", iface.NetNS, err)
828848
}

pkg/ifaces/informer.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@ package ifaces
33
import (
44
"context"
55
"fmt"
6-
"net"
7-
86
"github.com/sirupsen/logrus"
7+
"github.com/vishvananda/netlink"
98
"github.com/vishvananda/netns"
109
)
1110

@@ -49,14 +48,22 @@ type Informer interface {
4948
Subscribe(ctx context.Context) (<-chan Event, error)
5049
}
5150

52-
func netInterfaces() ([]Interface, error) {
53-
ifs, err := net.Interfaces()
51+
func netInterfaces(nsh netns.NsHandle) ([]Interface, error) {
52+
handle, err := netlink.NewHandleAt(nsh)
5453
if err != nil {
55-
return nil, fmt.Errorf("can't fetch interfaces: %w", err)
54+
return nil, fmt.Errorf("failed to create handle for netns (%s): %w", nsh.String(), err)
5655
}
57-
names := make([]Interface, len(ifs))
58-
for i, ifc := range ifs {
59-
names[i] = Interface{Name: ifc.Name, Index: ifc.Index, NetNS: netns.None()}
56+
defer handle.Delete()
57+
58+
// Get a list of interfaces in the namespace
59+
links, err := handle.LinkList()
60+
if err != nil {
61+
return nil, fmt.Errorf("failed to list interfaces in netns (%s): %w", nsh.String(), err)
62+
}
63+
64+
names := make([]Interface, len(links))
65+
for i, link := range links {
66+
names[i] = Interface{Name: link.Attrs().Name, Index: link.Attrs().Index, NetNS: nsh}
6067
}
6168
return names, nil
6269
}

pkg/ifaces/poller.go

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@ import (
55
"time"
66

77
"github.com/sirupsen/logrus"
8+
"github.com/vishvananda/netns"
89
)
910

1011
// Poller periodically looks for the network interfaces in the system and forwards Event
1112
// notifications when interfaces are added or deleted.
1213
type Poller struct {
1314
period time.Duration
1415
current map[Interface]struct{}
15-
interfaces func() ([]Interface, error)
16+
interfaces func(handle netns.NsHandle) ([]Interface, error)
1617
bufLen int
1718
}
1819

@@ -26,32 +27,54 @@ func NewPoller(period time.Duration, bufLen int) *Poller {
2627
}
2728

2829
func (np *Poller) Subscribe(ctx context.Context) (<-chan Event, error) {
29-
log := logrus.WithField("component", "ifaces.Poller")
30-
log.WithField("period", np.period).Debug("subscribing to Interface events")
30+
3131
out := make(chan Event, np.bufLen)
32-
go func() {
33-
ticker := time.NewTicker(np.period)
34-
defer ticker.Stop()
35-
for {
36-
if ifaces, err := np.interfaces(); err != nil {
37-
log.WithError(err).Warn("fetching interface names")
38-
} else {
39-
log.WithField("names", ifaces).Debug("fetched interface names")
40-
np.diffNames(out, ifaces)
41-
}
42-
select {
43-
case <-ctx.Done():
44-
log.Debug("stopped")
45-
close(out)
46-
return
47-
case <-ticker.C:
48-
// continue after period
49-
}
32+
netns, err := getNetNS()
33+
if err != nil {
34+
go np.pollForEvents(ctx, "", out)
35+
} else {
36+
for _, n := range netns {
37+
go np.pollForEvents(ctx, n, out)
5038
}
51-
}()
39+
}
5240
return out, nil
5341
}
5442

43+
func (np *Poller) pollForEvents(ctx context.Context, ns string, out chan Event) {
44+
log := logrus.WithField("component", "ifaces.Poller")
45+
log.WithField("period", np.period).Debug("subscribing to Interface events")
46+
ticker := time.NewTicker(np.period)
47+
var netnsHandle netns.NsHandle
48+
var err error
49+
50+
if ns == "" {
51+
netnsHandle = netns.None()
52+
} else {
53+
netnsHandle, err = netns.GetFromName(ns)
54+
if err != nil {
55+
return
56+
}
57+
}
58+
59+
defer ticker.Stop()
60+
for {
61+
if ifaces, err := np.interfaces(netnsHandle); err != nil {
62+
log.WithError(err).Warn("fetching interface names")
63+
} else {
64+
log.WithField("names", ifaces).Debug("fetched interface names")
65+
np.diffNames(out, ifaces)
66+
}
67+
select {
68+
case <-ctx.Done():
69+
log.Debug("stopped")
70+
close(out)
71+
return
72+
case <-ticker.C:
73+
// continue after a period
74+
}
75+
}
76+
}
77+
5578
// diffNames compares and updates the internal account of interfaces with the latest list of
5679
// polled interfaces. It forwards Events for any detected addition or removal of interfaces.
5780
func (np *Poller) diffNames(events chan Event, ifaces []Interface) {

pkg/ifaces/poller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func TestPoller(t *testing.T) {
1919
// fake net.Interfaces implementation that returns two different sets of
2020
// interfaces on successive invocations
2121
firstInvocation := true
22-
var fakeInterfaces = func() ([]Interface, error) {
22+
var fakeInterfaces = func(handle netns.NsHandle) ([]Interface, error) {
2323
if firstInvocation {
2424
firstInvocation = false
2525
return []Interface{{"foo", 1, netns.None()}, {"bar", 2, netns.None()}}, nil

pkg/ifaces/registerer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func TestRegisterer(t *testing.T) {
1717
watcher := NewWatcher(10)
1818
registry := NewRegisterer(watcher, 10)
1919
// mock net.Interfaces and linkSubscriber to control which interfaces are discovered
20-
watcher.interfaces = func() ([]Interface, error) {
20+
watcher.interfaces = func(handle netns.NsHandle) ([]Interface, error) {
2121
return []Interface{{"foo", 1, netns.None()}, {"bar", 2, netns.None()}, {"baz", 3, netns.None()}}, nil
2222
}
2323
inputLinks := make(chan netlink.LinkUpdate, 10)

pkg/ifaces/watcher.go

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import (
77
"path/filepath"
88
"sync"
99
"syscall"
10+
"time"
1011

1112
"github.com/fsnotify/fsnotify"
1213
"github.com/sirupsen/logrus"
1314
"github.com/vishvananda/netlink"
1415
"github.com/vishvananda/netns"
16+
"k8s.io/apimachinery/pkg/util/wait"
1517
)
1618

1719
const (
@@ -23,7 +25,7 @@ const (
2325
type Watcher struct {
2426
bufLen int
2527
current map[Interface]struct{}
26-
interfaces func() ([]Interface, error)
28+
interfaces func(handle netns.NsHandle) ([]Interface, error)
2729
// linkSubscriber abstracts netlink.LinkSubscribe implementation, allowing the injection of
2830
// mocks for unit testing
2931
linkSubscriberAt func(ns netns.NsHandle, ch chan<- netlink.LinkUpdate, done <-chan struct{}) error
@@ -45,34 +47,58 @@ func NewWatcher(bufLen int) *Watcher {
4547
func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) {
4648
out := make(chan Event, w.bufLen)
4749

48-
nsHandles, err := getNetNSHandles()
50+
netns, err := getNetNS()
4951
if err != nil {
50-
go w.sendUpdates(ctx, netns.None(), out)
52+
go w.sendUpdates(ctx, "", out)
5153
} else {
52-
for _, nsh := range nsHandles {
53-
nsHandle := nsh
54-
go w.sendUpdates(ctx, nsHandle, out)
54+
for _, n := range netns {
55+
go w.sendUpdates(ctx, n, out)
5556
}
5657
}
5758
// register to get notification when netns is created or deleted and register for link update for new netns
5859
w.netnsNotify(ctx, out)
5960
return out, nil
6061
}
6162

62-
func (w *Watcher) sendUpdates(ctx context.Context, netnsHandle netns.NsHandle, out chan Event) {
63+
func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
64+
var netnsHandle netns.NsHandle
65+
var err error
6366
log := logrus.WithField("component", "ifaces.Watcher")
6467
// subscribe for interface events
6568
links := make(chan netlink.LinkUpdate)
66-
log.WithField("netns", netnsHandle.String()).Debug("linkSubscribe to receive links update")
67-
if err := w.linkSubscriberAt(netnsHandle, links, ctx.Done()); err != nil {
68-
log.WithError(err).Errorf("can't subscribe to links netns %s", netnsHandle.String())
69+
doneChan := make(chan struct{})
70+
if err = wait.PollUntilContextTimeout(ctx, 50*time.Microsecond, time.Second, true, func(ctx context.Context) (done bool, err error) {
71+
if ns == "" {
72+
netnsHandle = netns.None()
73+
} else {
74+
if netnsHandle, err = netns.GetFromName(ns); err != nil {
75+
return false, nil
76+
}
77+
}
78+
79+
if err = w.linkSubscriberAt(netnsHandle, links, doneChan); err != nil {
80+
log.WithFields(logrus.Fields{
81+
"netns": ns,
82+
"netnsHandle": netnsHandle.String(),
83+
"error": err,
84+
}).Debug("linkSubscribe failed retry")
85+
return false, nil
86+
}
87+
88+
log.WithFields(logrus.Fields{
89+
"netns": ns,
90+
"netnsHandle": netnsHandle.String(),
91+
}).Debug("linkSubscribe to receive links update")
92+
return true, nil
93+
}); err != nil {
94+
log.WithError(err).Errorf("can't subscribe to links netns %s netnsHandle %s", ns, netnsHandle.String())
6995
return
7096
}
7197

7298
// before sending netlink updates, send all the existing interfaces at the moment of starting
7399
// the Watcher
74-
if netnsHandle.Equal(netns.None()) {
75-
if names, err := w.interfaces(); err != nil {
100+
if netnsHandle.IsOpen() || netnsHandle.Equal(netns.None()) {
101+
if names, err := w.interfaces(netnsHandle); err != nil {
76102
log.WithError(err).Error("can't fetch network interfaces. You might be missing flows")
77103
} else {
78104
for _, name := range names {
@@ -119,35 +145,28 @@ func (w *Watcher) sendUpdates(ctx context.Context, netnsHandle netns.NsHandle, o
119145
}
120146
}
121147

122-
func getNetNSHandles() ([]netns.NsHandle, error) {
148+
func getNetNS() ([]string, error) {
123149
log := logrus.WithField("component", "ifaces.Watcher")
124150
files, err := os.ReadDir(netnsVolume)
125151
if err != nil {
126152
log.Warningf("can't detect any network-namespaces err: %v [Ignore if the agent privileged flag is not set]", err)
127153
return nil, fmt.Errorf("failed to list network-namespaces: %w", err)
128154
}
129155

130-
handles := []netns.NsHandle{netns.None()}
156+
netns := []string{""}
131157
if len(files) == 0 {
132158
log.WithField("netns", files).Debug("empty network-namespaces list")
133-
return handles, nil
159+
return netns, nil
134160
}
135161
for _, f := range files {
136162
ns := f.Name()
137-
handle, err := netns.GetFromName(ns)
138-
if err != nil {
139-
log.WithField("netns", ns).Debug("can't get NsHandle for this netns. Ignoring")
140-
continue
141-
}
142-
handles = append(handles, handle)
163+
netns = append(netns, ns)
143164
log.WithFields(logrus.Fields{
144-
"netns": ns,
145-
"handle": handle.String(),
165+
"netns": ns,
146166
}).Debug("Detected network-namespace")
147-
148167
}
149168

150-
return handles, nil
169+
return netns, nil
151170
}
152171

153172
func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) {
@@ -170,12 +189,7 @@ func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) {
170189
if event.Op&fsnotify.Create == fsnotify.Create {
171190
ns := filepath.Base(event.Name)
172191
log.WithField("netns", ns).Debug("netns notification")
173-
handle, err := netns.GetFromName(ns)
174-
if err != nil {
175-
log.WithField("netns", ns).Debug("can't get NsHandle for this netns. Ignoring")
176-
return
177-
}
178-
go w.sendUpdates(ctx, handle, out)
192+
go w.sendUpdates(ctx, ns, out)
179193
}
180194
case err, ok := <-w.netnsWatcher.Errors:
181195
if !ok {

pkg/ifaces/watcher_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func TestWatcher(t *testing.T) {
1919

2020
watcher := NewWatcher(10)
2121
// mock net.Interfaces and linkSubscriber to control which interfaces are discovered
22-
watcher.interfaces = func() ([]Interface, error) {
22+
watcher.interfaces = func(handle netns.NsHandle) ([]Interface, error) {
2323
return []Interface{{"foo", 1, netns.None()}, {"bar", 2, netns.None()}, {"baz", 3, netns.None()}}, nil
2424
}
2525
inputLinks := make(chan netlink.LinkUpdate, 10)

0 commit comments

Comments
 (0)