Skip to content

Commit 6f671f1

Browse files
authored
NETOBSERV-2063: fix agent fds leak (#513)
- make sure to close netnshandle - better handling for TCX already attached logic and avoid caching nil link handle Signed-off-by: Mohamed Mahmoud <[email protected]>
1 parent 7ab0c5b commit 6f671f1

File tree

2 files changed

+134
-26
lines changed

2 files changed

+134
-26
lines changed

pkg/ifaces/watcher.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
7171
defer func() {
7272
close(doneChan)
7373
delete(w.nsDone, ns)
74+
if netnsHandle.IsOpen() {
75+
netnsHandle.Close()
76+
}
7477
}()
7578
// subscribe for interface events
7679
links := make(chan netlink.LinkUpdate)
@@ -82,7 +85,6 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
8285
return false, nil
8386
}
8487
}
85-
8688
if err = w.linkSubscriberAt(netnsHandle, links, doneChan); err != nil {
8789
log.WithFields(logrus.Fields{
8890
"netns": ns,
@@ -163,7 +165,6 @@ func getNetNS() ([]string, error) {
163165
log.Warningf("can't detect any network-namespaces err: %v [Ignore if the agent privileged flag is not set]", err)
164166
return nil, fmt.Errorf("failed to list network-namespaces: %w", err)
165167
}
166-
167168
netns := []string{""}
168169
if len(files) == 0 {
169170
log.WithField("netns", files).Debug("empty network-namespaces list")

pkg/tracer/tracer.go

Lines changed: 131 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,12 @@ const (
3535
// ebpf map names as defined in bpf/maps_definition.h
3636
aggregatedFlowsMap = "aggregated_flows"
3737
additionalFlowMetrics = "additional_flow_metrics"
38+
directFlowsMap = "direct_flows"
3839
dnsLatencyMap = "dns_flows"
39-
flowFilterMap = "filter_map"
40-
flowPeerFilterMap = "peer_filter_map"
40+
filterMap = "filter_map"
41+
peerFilterMap = "peer_filter_map"
42+
globalCountersMap = "global_counters"
43+
pcaRecordsMap = "packet_record"
4144
// constants defined in flows.c as "volatile const"
4245
constSampling = "sampling"
4346
constHasFilterSampling = "has_filter_sampling"
@@ -52,7 +55,6 @@ const (
5255
constEnablePktTranslation = "enable_pkt_translation_tracking"
5356
pktDropHook = "kfree_skb"
5457
constPcaEnable = "enable_pca"
55-
pcaRecordsMap = "packet_record"
5658
tcEgressFilterName = "tc/tc_egress_flow_parse"
5759
tcIngressFilterName = "tc/tc_ingress_flow_parse"
5860
tcpFentryHook = "tcp_rcv_fentry"
@@ -137,8 +139,15 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
137139
spec.Maps[additionalFlowMetrics].MaxEntries = uint32(cfg.CacheMaxSize)
138140

139141
// remove pinning from all maps
140-
maps2Name := []string{"aggregated_flows", "additional_flow_metrics", "direct_flows", "dns_flows", "filter_map", "peer_filter_map", "global_counters", "packet_record"}
141-
for _, m := range maps2Name {
142+
for _, m := range []string{
143+
aggregatedFlowsMap,
144+
additionalFlowMetrics,
145+
directFlowsMap,
146+
dnsLatencyMap,
147+
filterMap,
148+
peerFilterMap,
149+
globalCountersMap,
150+
pcaRecordsMap} {
142151
spec.Maps[m].Pinning = 0
143152
}
144153

@@ -171,8 +180,8 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
171180
enableFlowFiltering = 1
172181
hasFilterSampling = filter.hasSampling()
173182
} else {
174-
spec.Maps[flowFilterMap].MaxEntries = 1
175-
spec.Maps[flowPeerFilterMap].MaxEntries = 1
183+
spec.Maps[filterMap].MaxEntries = 1
184+
spec.Maps[peerFilterMap].MaxEntries = 1
176185
}
177186
enableNetworkEventsMonitoring := 0
178187
if cfg.EnableNetworkEventsMonitoring {
@@ -294,47 +303,49 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
294303
}
295304

296305
log.Info("BPFManager mode: loading aggregated flows pinned maps")
297-
mPath := path.Join(pinDir, "aggregated_flows")
306+
mPath := path.Join(pinDir, aggregatedFlowsMap)
298307
objects.BpfMaps.AggregatedFlows, err = cilium.LoadPinnedMap(mPath, opts)
299308
if err != nil {
300309
return nil, fmt.Errorf("failed to load %s: %w", mPath, err)
301310
}
302311
log.Info("BPFManager mode: loading additional flow metrics pinned maps")
303-
mPath = path.Join(pinDir, "additional_flow_metrics")
312+
mPath = path.Join(pinDir, additionalFlowMetrics)
304313
objects.BpfMaps.AdditionalFlowMetrics, err = cilium.LoadPinnedMap(mPath, opts)
305314
if err != nil {
306315
return nil, fmt.Errorf("failed to load %s: %w", mPath, err)
307316
}
308317
log.Info("BPFManager mode: loading direct flows pinned maps")
309-
mPath = path.Join(pinDir, "direct_flows")
318+
mPath = path.Join(pinDir, directFlowsMap)
310319
objects.BpfMaps.DirectFlows, err = cilium.LoadPinnedMap(mPath, opts)
311320
if err != nil {
312321
return nil, fmt.Errorf("failed to load %s: %w", mPath, err)
313322
}
314323
log.Infof("BPFManager mode: loading DNS flows pinned maps")
315-
mPath = path.Join(pinDir, "dns_flows")
324+
mPath = path.Join(pinDir, dnsLatencyMap)
316325
objects.BpfMaps.DnsFlows, err = cilium.LoadPinnedMap(mPath, opts)
317326
if err != nil {
318327
return nil, fmt.Errorf("failed to load %s: %w", mPath, err)
319328
}
320329
log.Infof("BPFManager mode: loading filter pinned maps")
321-
mPath = path.Join(pinDir, "filter_map")
330+
mPath = path.Join(pinDir, filterMap)
322331
objects.BpfMaps.FilterMap, err = cilium.LoadPinnedMap(mPath, opts)
323332
if err != nil {
324333
return nil, fmt.Errorf("failed to load %s: %w", mPath, err)
325334
}
335+
log.Infof("BPFManager mode: loading Peerfilter pinned maps")
336+
mPath = path.Join(pinDir, peerFilterMap)
326337
objects.BpfMaps.PeerFilterMap, err = cilium.LoadPinnedMap(mPath, opts)
327338
if err != nil {
328339
return nil, fmt.Errorf("failed to load %s: %w", mPath, err)
329340
}
330341
log.Infof("BPFManager mode: loading global counters pinned maps")
331-
mPath = path.Join(pinDir, "global_counters")
342+
mPath = path.Join(pinDir, globalCountersMap)
332343
objects.BpfMaps.GlobalCounters, err = cilium.LoadPinnedMap(mPath, opts)
333344
if err != nil {
334345
return nil, fmt.Errorf("failed to load %s: %w", mPath, err)
335346
}
336347
log.Infof("BPFManager mode: loading packet record pinned maps")
337-
mPath = path.Join(pinDir, "packet_record")
348+
mPath = path.Join(pinDir, pcaRecordsMap)
338349
objects.BpfMaps.PacketRecord, err = cilium.LoadPinnedMap(mPath, opts)
339350
if err != nil {
340351
return nil, fmt.Errorf("failed to load %s: %w", mPath, err)
@@ -402,12 +413,29 @@ func (m *FlowFetcher) AttachTCX(iface ifaces.Interface) error {
402413
if errors.Is(err, fs.ErrExist) {
403414
// The interface already has a TCX egress hook
404415
log.WithField("iface", iface.Name).Debug("interface already has a TCX egress hook ignore")
416+
if q, err := link.QueryPrograms(link.QueryOptions{
417+
Target: iface.Index,
418+
Attach: cilium.AttachTCXEgress,
419+
}); err == nil {
420+
for _, id := range q.Programs {
421+
linkID, ok := id.LinkID()
422+
if !ok {
423+
return fmt.Errorf("failed to get linkID for %s: %w", iface.Name, err)
424+
}
425+
if egrLink, err = link.NewFromID(linkID); err != nil {
426+
return fmt.Errorf("failed to get link for egress flow to %s: %w", iface.Name, err)
427+
}
428+
ilog.WithField("link", linkID).Debug("attaching egress flow to link")
429+
}
430+
} else {
431+
return fmt.Errorf("failed to query TCX egress flow to %s: %w", iface.Name, err)
432+
}
405433
} else {
406434
return fmt.Errorf("failed to attach TCX egress: %w", err)
407435
}
408436
}
409437
m.egressTCXLink[iface] = egrLink
410-
ilog.WithField("interface", iface.Name).Debug("successfully attach egressTCX hook")
438+
ilog.WithField("interface", iface.Name).Debugf("successfully attach egressTCX hook link: %v", egrLink)
411439
}
412440

413441
if m.enableIngress {
@@ -420,12 +448,29 @@ func (m *FlowFetcher) AttachTCX(iface ifaces.Interface) error {
420448
if errors.Is(err, fs.ErrExist) {
421449
// The interface already has a TCX ingress hook
422450
log.WithField("iface", iface.Name).Debug("interface already has a TCX ingress hook ignore")
451+
if q, err := link.QueryPrograms(link.QueryOptions{
452+
Target: iface.Index,
453+
Attach: cilium.AttachTCXIngress,
454+
}); err == nil {
455+
for _, id := range q.Programs {
456+
linkID, ok := id.LinkID()
457+
if !ok {
458+
return fmt.Errorf("failed to get linkID for %s: %w", iface.Name, err)
459+
}
460+
if ingLink, err = link.NewFromID(linkID); err != nil {
461+
return fmt.Errorf("failed to get link for ingress flow to %s: %w", iface.Name, err)
462+
}
463+
ilog.WithField("link", linkID).Debug("attaching ingress flow to link")
464+
}
465+
} else {
466+
return fmt.Errorf("failed to query existing TCX ingress flow to %s: %w", iface.Name, err)
467+
}
423468
} else {
424469
return fmt.Errorf("failed to attach TCX ingress: %w", err)
425470
}
426471
}
427472
m.ingressTCXLink[iface] = ingLink
428-
ilog.WithField("interface", iface.Name).Debug("successfully attach ingressTCX hook")
473+
ilog.WithField("interface", iface.Name).Debugf("successfully attach ingressTCX hook link: %v", ingLink)
429474
}
430475

431476
return nil
@@ -453,7 +498,8 @@ func (m *FlowFetcher) DetachTCX(iface ifaces.Interface) error {
453498
if err := l.Close(); err != nil {
454499
return fmt.Errorf("TCX: failed to close egress link: %w", err)
455500
}
456-
ilog.WithField("interface", iface.Name).Debug("successfully detach egressTCX hook")
501+
ilog.WithField("interface", iface.Name).Debugf("successfully detach egressTCX hook link: %v",
502+
m.egressTCXLink[iface])
457503
} else {
458504
return fmt.Errorf("egress link does not have a TCX egress hook")
459505
}
@@ -464,7 +510,8 @@ func (m *FlowFetcher) DetachTCX(iface ifaces.Interface) error {
464510
if err := l.Close(); err != nil {
465511
return fmt.Errorf("TCX: failed to close ingress link: %w", err)
466512
}
467-
ilog.WithField("interface", iface.Name).Debug("successfully detach ingressTCX hook")
513+
ilog.WithField("interface", iface.Name).Debugf("successfully detach ingressTCX hook link: %v",
514+
m.ingressTCXLink[iface])
468515
} else {
469516
return fmt.Errorf("ingress link does not have a TCX ingress hook")
470517
}
@@ -728,24 +775,45 @@ func (m *FlowFetcher) Close() error {
728775
if err := m.objects.TcxIngressFlowParse.Close(); err != nil {
729776
errs = append(errs, err)
730777
}
778+
if err := m.objects.AggregatedFlows.Unpin(); err != nil {
779+
errs = append(errs, err)
780+
}
731781
if err := m.objects.AggregatedFlows.Close(); err != nil {
732782
errs = append(errs, err)
733783
}
784+
if err := m.objects.AdditionalFlowMetrics.Unpin(); err != nil {
785+
errs = append(errs, err)
786+
}
734787
if err := m.objects.AdditionalFlowMetrics.Close(); err != nil {
735788
errs = append(errs, err)
736789
}
790+
if err := m.objects.DirectFlows.Unpin(); err != nil {
791+
errs = append(errs, err)
792+
}
737793
if err := m.objects.DirectFlows.Close(); err != nil {
738794
errs = append(errs, err)
739795
}
796+
if err := m.objects.DnsFlows.Unpin(); err != nil {
797+
errs = append(errs, err)
798+
}
740799
if err := m.objects.DnsFlows.Close(); err != nil {
741800
errs = append(errs, err)
742801
}
802+
if err := m.objects.GlobalCounters.Unpin(); err != nil {
803+
errs = append(errs, err)
804+
}
743805
if err := m.objects.GlobalCounters.Close(); err != nil {
744806
errs = append(errs, err)
745807
}
808+
if err := m.objects.FilterMap.Unpin(); err != nil {
809+
errs = append(errs, err)
810+
}
746811
if err := m.objects.FilterMap.Close(); err != nil {
747812
errs = append(errs, err)
748813
}
814+
if err := m.objects.PeerFilterMap.Unpin(); err != nil {
815+
errs = append(errs, err)
816+
}
749817
if err := m.objects.PeerFilterMap.Close(); err != nil {
750818
errs = append(errs, err)
751819
}
@@ -792,10 +860,8 @@ func (m *FlowFetcher) Close() error {
792860
}
793861
m.ingressTCXLink = map[ifaces.Interface]link.Link{}
794862

795-
if !m.useEbpfManager {
796-
if err := m.removeAllPins(); err != nil {
797-
errs = append(errs, err)
798-
}
863+
if err := m.removeAllPins(); err != nil {
864+
errs = append(errs, err)
799865
}
800866

801867
if len(errs) == 0 {
@@ -1297,8 +1363,15 @@ func NewPacketFetcher(cfg *FlowFetcherConfig) (*PacketFetcher, error) {
12971363
}
12981364

12991365
// remove pinning from all maps
1300-
maps2Name := []string{"aggregated_flows", "additional_flow_metrics", "direct_flows", "dns_flows", "filter_map", "global_counters", "packet_record"}
1301-
for _, m := range maps2Name {
1366+
for _, m := range []string{
1367+
aggregatedFlowsMap,
1368+
additionalFlowMetrics,
1369+
directFlowsMap,
1370+
dnsLatencyMap,
1371+
filterMap,
1372+
peerFilterMap,
1373+
globalCountersMap,
1374+
pcaRecordsMap} {
13021375
spec.Maps[m].Pinning = 0
13031376
}
13041377

@@ -1510,6 +1583,23 @@ func (p *PacketFetcher) AttachTCX(iface ifaces.Interface) error {
15101583
if errors.Is(err, fs.ErrExist) {
15111584
// The interface already has a TCX egress hook
15121585
log.WithField("iface", iface.Name).Debug("interface already has a TCX PCA egress hook ignore")
1586+
if q, err := link.QueryPrograms(link.QueryOptions{
1587+
Target: iface.Index,
1588+
Attach: cilium.AttachTCXEgress,
1589+
}); err == nil {
1590+
for _, id := range q.Programs {
1591+
linkID, ok := id.LinkID()
1592+
if !ok {
1593+
return fmt.Errorf("failed to get linkID for %s: %w", iface.Name, err)
1594+
}
1595+
if egrLink, err = link.NewFromID(linkID); err != nil {
1596+
return fmt.Errorf("failed to get link for egress flow to %s: %w", iface.Name, err)
1597+
}
1598+
ilog.WithField("link", linkID).Debug("attaching egress flow to link")
1599+
}
1600+
} else {
1601+
return fmt.Errorf("failed to query TCX egress flow to %s: %w", iface.Name, err)
1602+
}
15131603
} else {
15141604
return fmt.Errorf("failed to attach PCA TCX egress: %w", err)
15151605
}
@@ -1528,6 +1618,23 @@ func (p *PacketFetcher) AttachTCX(iface ifaces.Interface) error {
15281618
if errors.Is(err, fs.ErrExist) {
15291619
// The interface already has a TCX ingress hook
15301620
log.WithField("iface", iface.Name).Debug("interface already has a TCX PCA ingress hook ignore")
1621+
if q, err := link.QueryPrograms(link.QueryOptions{
1622+
Target: iface.Index,
1623+
Attach: cilium.AttachTCXEgress,
1624+
}); err == nil {
1625+
for _, id := range q.Programs {
1626+
linkID, ok := id.LinkID()
1627+
if !ok {
1628+
return fmt.Errorf("failed to get linkID for %s: %w", iface.Name, err)
1629+
}
1630+
if ingLink, err = link.NewFromID(linkID); err != nil {
1631+
return fmt.Errorf("failed to get link for ingress flow to %s: %w", iface.Name, err)
1632+
}
1633+
ilog.WithField("link", linkID).Debug("attaching ingress flow to link")
1634+
}
1635+
} else {
1636+
return fmt.Errorf("failed to query TCX ingress flow to %s: %w", iface.Name, err)
1637+
}
15311638
} else {
15321639
return fmt.Errorf("failed to attach PCA TCX ingress: %w", err)
15331640
}

0 commit comments

Comments
 (0)