Skip to content

Commit dd5826d

Browse files
authored
Merge pull request #528 from msherif1234/chk_udn
NETOBSERV-2081: Cache and load UDN maps only when udn is enabled
2 parents 08e7a33 + 15cfbbe commit dd5826d

File tree

10 files changed

+58
-45
lines changed

10 files changed

+58
-45
lines changed

pkg/agent/agent.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,9 +296,9 @@ func flowsAgent(cfg *Config, m *metrics.Metrics,
296296
samplingGauge := m.CreateSamplingRate()
297297
samplingGauge.Set(float64(cfg.Sampling))
298298

299-
mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout, m, s)
299+
mapTracer := flow.NewMapTracer(fetcher, cfg.CacheActiveTimeout, cfg.StaleEntriesEvictTimeout, m, s, cfg.EnableUDNMapping)
300300
rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout, m)
301-
accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m, s)
301+
accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m, s, cfg.EnableUDNMapping)
302302
limiter := flow.NewCapacityLimiter(m)
303303

304304
return &Flows{

pkg/exporter/converters_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func TestConversions(t *testing.T) {
5555
},
5656
},
5757
},
58-
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)},
58+
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
5959
TimeFlowStart: someTime,
6060
TimeFlowEnd: someTime,
6161
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -103,7 +103,7 @@ func TestConversions(t *testing.T) {
103103
Sampling: 2,
104104
},
105105
},
106-
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)},
106+
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
107107
TimeFlowStart: someTime,
108108
TimeFlowEnd: someTime,
109109
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -149,7 +149,7 @@ func TestConversions(t *testing.T) {
149149
Dscp: 64,
150150
},
151151
},
152-
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)},
152+
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
153153
TimeFlowStart: someTime,
154154
TimeFlowEnd: someTime,
155155
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -194,7 +194,7 @@ func TestConversions(t *testing.T) {
194194
Dscp: 64,
195195
},
196196
},
197-
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)},
197+
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
198198
TimeFlowStart: someTime,
199199
TimeFlowEnd: someTime,
200200
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -240,7 +240,7 @@ func TestConversions(t *testing.T) {
240240
Packets: 128,
241241
},
242242
},
243-
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)},
243+
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
244244
TimeFlowStart: someTime,
245245
TimeFlowEnd: someTime,
246246
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -287,7 +287,7 @@ func TestConversions(t *testing.T) {
287287
},
288288
},
289289
},
290-
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)},
290+
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
291291
TimeFlowStart: someTime,
292292
TimeFlowEnd: someTime,
293293
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -347,7 +347,7 @@ func TestConversions(t *testing.T) {
347347
},
348348
},
349349
},
350-
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil)},
350+
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("eth0", model.DirectionEgress, nil)},
351351
TimeFlowStart: someTime,
352352
TimeFlowEnd: someTime,
353353
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -412,8 +412,8 @@ func TestConversions(t *testing.T) {
412412
},
413413
},
414414
Interfaces: []model.IntfDirUdn{
415-
model.NewIntfDirUdn("5e6e92caa1d51cf", model.DirectionIngress, nil, nil),
416-
model.NewIntfDirUdn("eth0", model.DirectionEgress, nil, nil),
415+
model.NewIntfDirUdn("5e6e92caa1d51cf", model.DirectionIngress, nil),
416+
model.NewIntfDirUdn("eth0", model.DirectionEgress, nil),
417417
},
418418
TimeFlowStart: someTime,
419419
TimeFlowEnd: someTime,

pkg/exporter/grpc_proto_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {
123123
for i := 0; i < 25000; i++ {
124124
input = append(input, &model.Record{Metrics: model.BpfFlowContent{BpfFlowMetrics: &ebpf.BpfFlowMetrics{
125125
EthProtocol: model.IPv6Type,
126-
}}, AgentIP: net.ParseIP("1111::1111"), Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("12345678", 0, nil, nil)}})
126+
}}, AgentIP: net.ParseIP("1111::1111"), Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("12345678", 0, nil)}})
127127
}
128128
flows <- input
129129
go exporter.ExportFlows(flows)

pkg/exporter/kafka_proto_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func TestProtoConversion(t *testing.T) {
5151
Flags: uint16(1),
5252
},
5353
},
54-
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("veth0", 0, nil, nil), model.NewIntfDirUdn("abcde", 1, nil, nil)},
54+
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("veth0", 0, nil), model.NewIntfDirUdn("abcde", 1, nil)},
5555
}
5656

5757
input <- []*model.Record{&record}
@@ -108,7 +108,7 @@ func TestIdenticalKeys(t *testing.T) {
108108
Flags: uint16(1),
109109
},
110110
},
111-
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("veth0", 0, nil, nil), model.NewIntfDirUdn("abcde", 1, nil, nil)},
111+
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("veth0", 0, nil), model.NewIntfDirUdn("abcde", 1, nil)},
112112
}
113113
key1 := getFlowKey(&record)
114114

pkg/flow/account.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package flow
22

33
import (
4+
"maps"
45
"time"
56

67
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
@@ -23,6 +24,7 @@ type Accounter struct {
2324
monoClock func() time.Duration
2425
metrics *metrics.Metrics
2526
s *ovnobserv.SampleDecoder
27+
udnEnabled bool
2628
}
2729

2830
var alog = logrus.WithField("component", "flow/Accounter")
@@ -35,6 +37,7 @@ func NewAccounter(
3537
monoClock func() time.Duration,
3638
m *metrics.Metrics,
3739
s *ovnobserv.SampleDecoder,
40+
udnEnabled bool,
3841
) *Accounter {
3942
acc := Accounter{
4043
maxEntries: maxEntries,
@@ -44,6 +47,7 @@ func NewAccounter(
4447
monoClock: monoClock,
4548
metrics: m,
4649
s: s,
50+
udnEnabled: udnEnabled,
4751
}
4852
return &acc
4953
}
@@ -99,9 +103,19 @@ func (c *Accounter) evict(entries map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics, evict
99103
now := c.clock()
100104
monotonicNow := uint64(c.monoClock())
101105
records := make([]*model.Record, 0, len(entries))
106+
udnCache := make(map[string]string)
107+
if c.s != nil && c.udnEnabled {
108+
udnsMap, err := c.s.GetInterfaceUDNs()
109+
if err != nil {
110+
alog.Errorf("failed to get udns to interfaces map : %v", err)
111+
} else {
112+
maps.Copy(udnCache, udnsMap)
113+
alog.Tracef("GetInterfaceUDNS map: %v", udnCache)
114+
}
115+
}
102116
for key, metrics := range entries {
103117
flowContent := model.NewBpfFlowContent(*metrics)
104-
records = append(records, model.NewRecord(key, &flowContent, now, monotonicNow, c.s))
118+
records = append(records, model.NewRecord(key, &flowContent, now, monotonicNow, c.s, udnCache))
105119
}
106120
c.metrics.EvictionCounter.WithSourceAndReason("accounter", reason).Inc()
107121
c.metrics.EvictedFlowsCounter.WithSourceAndReason("accounter", reason).Add(float64(len(records)))

pkg/flow/account_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func TestEvict_MaxEntries(t *testing.T) {
5151
return now
5252
}, func() time.Duration {
5353
return 1000
54-
}, metrics.NewMetrics(&metrics.Settings{}), nil)
54+
}, metrics.NewMetrics(&metrics.Settings{}), nil, false)
5555

5656
// WHEN it starts accounting new records
5757
inputs := make(chan *model.RawRecord, 20)
@@ -111,7 +111,7 @@ func TestEvict_MaxEntries(t *testing.T) {
111111
},
112112
TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond),
113113
TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond),
114-
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil, nil)},
114+
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil)},
115115
},
116116
k2: {
117117
ID: k2,
@@ -122,7 +122,7 @@ func TestEvict_MaxEntries(t *testing.T) {
122122
},
123123
TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond),
124124
TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond),
125-
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil, nil)},
125+
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil)},
126126
},
127127
}, received)
128128
}
@@ -134,7 +134,7 @@ func TestEvict_Period(t *testing.T) {
134134
return now
135135
}, func() time.Duration {
136136
return 1000
137-
}, metrics.NewMetrics(&metrics.Settings{}), nil)
137+
}, metrics.NewMetrics(&metrics.Settings{}), nil, false)
138138

139139
// WHEN it starts accounting new records
140140
inputs := make(chan *model.RawRecord, 20)
@@ -191,7 +191,7 @@ func TestEvict_Period(t *testing.T) {
191191
},
192192
TimeFlowStart: now.Add(-1000 + 123),
193193
TimeFlowEnd: now.Add(-1000 + 789),
194-
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil, nil)},
194+
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil)},
195195
}, *records[0])
196196
records = receiveTimeout(t, evictor)
197197
require.Len(t, records, 1)
@@ -208,7 +208,7 @@ func TestEvict_Period(t *testing.T) {
208208
},
209209
TimeFlowStart: now.Add(-1000 + 1123),
210210
TimeFlowEnd: now.Add(-1000 + 1456),
211-
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil, nil)},
211+
Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn("[namer unset] 0", 0, nil)},
212212
}, *records[0])
213213

214214
// no more flows are evicted

pkg/flow/limiter_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func TestCapacityLimiter_NoDrop(t *testing.T) {
1919

2020
// WHEN it buffers less elements than it's maximum capacity
2121
for i := 0; i < 33; i++ {
22-
pipeIn <- []*model.Record{{Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn(strconv.Itoa(i), 0, nil, nil)}}}
22+
pipeIn <- []*model.Record{{Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn(strconv.Itoa(i), 0, nil)}}}
2323
}
2424

2525
// THEN it is able to retrieve all the buffered elements
@@ -45,7 +45,7 @@ func TestCapacityLimiter_Drop(t *testing.T) {
4545
// WHEN it receives more elements than its maximum capacity
4646
// (it's not blocking)
4747
for i := 0; i < limiterLen*2; i++ {
48-
pipeIn <- []*model.Record{{Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn(strconv.Itoa(i), 0, nil, nil)}}}
48+
pipeIn <- []*model.Record{{Interfaces: []model.IntfDirUdn{model.NewIntfDirUdn(strconv.Itoa(i), 0, nil)}}}
4949
}
5050

5151
// THEN it is only able to retrieve all the nth first buffered elements

pkg/flow/tracer_map.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package flow
22

33
import (
44
"context"
5+
"maps"
56
"runtime"
67
"sync"
78
"time"
@@ -30,6 +31,7 @@ type MapTracer struct {
3031
metrics *metrics.Metrics
3132
timeSpentinLookupAndDelete prometheus.Histogram
3233
s *ovnobserv.SampleDecoder
34+
udnEnabled bool
3335
}
3436

3537
type mapFetcher interface {
@@ -38,7 +40,7 @@ type mapFetcher interface {
3840
}
3941

4042
func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout time.Duration, m *metrics.Metrics,
41-
s *ovnobserv.SampleDecoder) *MapTracer {
43+
s *ovnobserv.SampleDecoder, udnEnabled bool) *MapTracer {
4244
return &MapTracer{
4345
mapFetcher: fetcher,
4446
evictionTimeout: evictionTimeout,
@@ -47,6 +49,7 @@ func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout
4749
metrics: m,
4850
timeSpentinLookupAndDelete: m.CreateTimeSpendInLookupAndDelete(),
4951
s: s,
52+
udnEnabled: udnEnabled,
5053
}
5154
}
5255

@@ -105,13 +108,24 @@ func (m *MapTracer) evictFlows(ctx context.Context, forceGC bool, forwardFlows c
105108
var forwardingFlows []*model.Record
106109
flows := m.mapFetcher.LookupAndDeleteMap(m.metrics)
107110
elapsed := time.Since(currentTime)
111+
udnCache := make(map[string]string)
112+
if m.s != nil && m.udnEnabled {
113+
udnsMap, err := m.s.GetInterfaceUDNs()
114+
if err != nil {
115+
mtlog.Errorf("failed to get udns to interfaces map : %v", err)
116+
} else {
117+
maps.Copy(udnCache, udnsMap)
118+
mtlog.Tracef("GetInterfaceUDNS map: %v", udnCache)
119+
}
120+
}
108121
for flowKey, flowMetrics := range flows {
109122
forwardingFlows = append(forwardingFlows, model.NewRecord(
110123
flowKey,
111124
&flowMetrics,
112125
currentTime,
113126
uint64(monotonicTimeNow),
114127
m.s,
128+
udnCache,
115129
))
116130
}
117131
m.mapFetcher.DeleteMapsStaleEntries(m.staleEntriesEvictTimeout)

pkg/model/record.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"encoding/binary"
55
"fmt"
66
"io"
7-
"maps"
87
"net"
98
"reflect"
109
"time"
@@ -70,7 +69,6 @@ type Record struct {
7069
// Calculated RTT which is set when record is created by calling NewRecord
7170
TimeFlowRtt time.Duration
7271
NetworkMonitorEventsMD []map[string]string
73-
UdnsCache map[string]string
7472
}
7573

7674
func NewRecord(
@@ -79,6 +77,7 @@ func NewRecord(
7977
currentTime time.Time,
8078
monotonicCurrentTime uint64,
8179
s *ovnobserv.SampleDecoder,
80+
udnsCache map[string]string,
8281
) *Record {
8382
startDelta := time.Duration(monotonicCurrentTime - metrics.StartMonoTimeTs)
8483
endDelta := time.Duration(monotonicCurrentTime - metrics.EndMonoTimeTs)
@@ -90,18 +89,15 @@ func NewRecord(
9089
TimeFlowEnd: currentTime.Add(-endDelta),
9190
AgentIP: agentIP,
9291
}
93-
if s != nil {
94-
record.UdnsCache = make(map[string]string)
95-
}
9692
record.Interfaces = []IntfDirUdn{NewIntfDirUdn(interfaceNamer(int(metrics.IfIndexFirstSeen)),
9793
int(metrics.DirectionFirstSeen),
98-
s, record.UdnsCache)}
94+
udnsCache)}
9995

10096
for i := uint8(0); i < record.Metrics.NbObservedIntf; i++ {
10197
record.Interfaces = append(record.Interfaces, NewIntfDirUdn(
10298
interfaceNamer(int(metrics.ObservedIntf[i])),
10399
int(metrics.ObservedDirection[i]),
104-
s, record.UdnsCache,
100+
udnsCache,
105101
))
106102
}
107103

@@ -153,21 +149,10 @@ type IntfDirUdn struct {
153149
Udn string
154150
}
155151

156-
func NewIntfDirUdn(intf string, dir int, s *ovnobserv.SampleDecoder, cache map[string]string) IntfDirUdn {
152+
func NewIntfDirUdn(intf string, dir int, cache map[string]string) IntfDirUdn {
157153
udn := ""
158-
if s == nil {
159-
return IntfDirUdn{Interface: intf, Direction: dir, Udn: udn}
160-
}
161-
162-
// Load UDN cache if empty
163154
if len(cache) == 0 {
164-
m, err := s.GetInterfaceUDNs()
165-
if err != nil {
166-
recordLog.Errorf("failed to get udns to interfaces map : %v", err)
167-
return IntfDirUdn{Interface: intf, Direction: dir, Udn: udn}
168-
}
169-
maps.Copy(cache, m)
170-
recordLog.Tracef("GetInterfaceUDNS map: %v", cache)
155+
return IntfDirUdn{Interface: intf, Direction: dir, Udn: udn}
171156
}
172157

173158
// Look up the interface in the cache

pkg/model/record_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func TestParallelNewRecord(t *testing.T) {
9696
wg.Add(1)
9797
go func() {
9898
defer wg.Done()
99-
r := NewRecord(ebpf.BpfFlowId{}, &BpfFlowContent{BpfFlowMetrics: &ebpf.BpfFlowMetrics{}}, time.Now(), uint64(monotime.Now()), nil)
99+
r := NewRecord(ebpf.BpfFlowId{}, &BpfFlowContent{BpfFlowMetrics: &ebpf.BpfFlowMetrics{}}, time.Now(), uint64(monotime.Now()), nil, map[string]string{})
100100
assert.NotNil(t, r)
101101
}()
102102
}

0 commit comments

Comments
 (0)