Skip to content

Commit 03bb6a3

Browse files
authored
Interface name wasn't populated yet so add func to find ifname (#233)
Signed-off-by: Mohamed Mahmoud <[email protected]>
1 parent 6f1d70a commit 03bb6a3

File tree

6 files changed

+81
-21
lines changed

6 files changed

+81
-21
lines changed

pkg/decode/decode_protobuf.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,17 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
6666
out["Packets"] = flow.Packets
6767
}
6868
var interfaces []interface{}
69-
var directions []interface{}
70-
for _, entry := range flow.GetDupList() {
71-
out["Interfaces"] = append([]interface{}{entry.Interface}, interfaces...)
72-
out["FlowDirections"] = append([]interface{}{int(entry.Direction.Number())}, directions...)
69+
var flowDirections []interface{}
70+
71+
if len(flow.GetDupList()) != 0 {
72+
for _, entry := range flow.GetDupList() {
73+
interfaces = append(interfaces, entry.Interface)
74+
flowDirections = append(flowDirections, entry.Direction)
75+
}
76+
out["Interfaces"] = interfaces
77+
out["FlowDirections"] = flowDirections
7378
}
79+
7480
ethType := ethernet.EtherType(flow.EthProtocol)
7581
if ethType == ethernet.EtherTypeIPv4 || ethType == ethernet.EtherTypeIPv6 {
7682
out["SrcAddr"] = ipToStr(flow.Network.GetSrcAddr())

pkg/decode/decode_protobuf_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/netobserv/flowlogs-pipeline/pkg/config"
88
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
9+
910
"github.com/stretchr/testify/assert"
1011
"google.golang.org/protobuf/types/known/durationpb"
1112
"google.golang.org/protobuf/types/known/timestamppb"
@@ -70,7 +71,7 @@ func TestPBFlowToMap(t *testing.T) {
7071
delete(out, "TimeReceived")
7172
assert.Equal(t, config.GenericMap{
7273
"FlowDirection": 1,
73-
"FlowDirections": []interface{}{1},
74+
"FlowDirections": []interface{}{pbflow.Direction(1)},
7475
"Bytes": uint64(456),
7576
"SrcAddr": "1.2.3.4",
7677
"DstAddr": "5.6.7.8",

pkg/exporter/proto.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,16 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
8585
if fr.Metrics.DnsRecord.Latency != 0 {
8686
pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency)
8787
}
88-
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
89-
for _, m := range fr.DupList {
90-
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
91-
Interface: fr.Interface,
92-
Direction: pbflow.Direction(m[fr.Interface]),
93-
})
88+
if len(fr.DupList) != 0 {
89+
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
90+
for _, m := range fr.DupList {
91+
for key, value := range m {
92+
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
93+
Interface: key,
94+
Direction: pbflow.Direction(value),
95+
})
96+
}
97+
}
9498
}
9599
return &pbflowRecord
96100
}
@@ -142,12 +146,16 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record {
142146
if fr.Metrics.DnsRecord.Latency != 0 {
143147
pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency)
144148
}
145-
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
146-
for _, m := range fr.DupList {
147-
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
148-
Interface: fr.Interface,
149-
Direction: pbflow.Direction(m[fr.Interface]),
150-
})
149+
if len(fr.DupList) != 0 {
150+
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
151+
for _, m := range fr.DupList {
152+
for key, value := range m {
153+
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
154+
Interface: key,
155+
Direction: pbflow.Direction(value),
156+
})
157+
}
158+
}
151159
}
152160
return &pbflowRecord
153161
}

pkg/flow/deduper.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package flow
22

33
import (
44
"container/list"
5+
"reflect"
56
"time"
67

78
"github.com/sirupsen/logrus"
89

910
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
11+
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
1012
)
1113

1214
var dlog = logrus.WithField("component", "flow/Deduper")
@@ -93,8 +95,17 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec
9395
*fwd = append(*fwd, r)
9496
}
9597
if mergeDup {
96-
mergeEntry[r.Interface] = r.Id.Direction
97-
*fEntry.dupList = append(*fEntry.dupList, mergeEntry)
98+
ifName := utils.GetInterfaceName(r.Id.IfIndex)
99+
mergeEntry[ifName] = r.Id.Direction
100+
if dupEntryNew(*fEntry.dupList, mergeEntry) {
101+
*fEntry.dupList = append(*fEntry.dupList, mergeEntry)
102+
dlog.Debugf("merge list entries dump:")
103+
for _, entry := range *fEntry.dupList {
104+
for k, v := range entry {
105+
dlog.Debugf("interface %s dir %d", k, v)
106+
}
107+
}
108+
}
98109
}
99110
return
100111
}
@@ -111,22 +122,34 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec
111122
expiryTime: timeNow().Add(c.expire),
112123
}
113124
if mergeDup {
114-
mergeEntry[r.Interface] = r.Id.Direction
125+
ifName := utils.GetInterfaceName(r.Id.IfIndex)
126+
mergeEntry[ifName] = r.Id.Direction
115127
r.DupList = append(r.DupList, mergeEntry)
116128
e.dupList = &r.DupList
117129
}
118130
c.ifaces[rk] = c.entries.PushFront(&e)
119131
*fwd = append(*fwd, r)
120132
}
121133

134+
func dupEntryNew(dupList []map[string]uint8, mergeEntry map[string]uint8) bool {
135+
for _, entry := range dupList {
136+
if reflect.DeepEqual(entry, mergeEntry) {
137+
return false
138+
}
139+
}
140+
return true
141+
}
142+
122143
func (c *deduperCache) removeExpired() {
123144
now := timeNow()
124145
ele := c.entries.Back()
125146
evicted := 0
126147
for ele != nil && now.After(ele.Value.(*entry).expiryTime) {
127148
evicted++
128149
c.entries.Remove(ele)
129-
delete(c.ifaces, *ele.Value.(*entry).key)
150+
fEntry := ele.Value.(*entry)
151+
fEntry.dupList = nil
152+
delete(c.ifaces, *fEntry.key)
130153
ele = c.entries.Back()
131154
}
132155
if evicted > 0 {

pkg/flow/deduper_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/stretchr/testify/assert"
88

99
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
10+
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
1011
)
1112

1213
var (
@@ -151,6 +152,19 @@ func TestDedupeMerge(t *testing.T) {
151152
deduped := receiveTimeout(t, output)
152153
assert.Equal(t, []*Record{oneIf2}, deduped)
153154
assert.Equal(t, 2, len(oneIf2.DupList))
155+
156+
expectedMap := []map[string]uint8{
157+
{
158+
utils.GetInterfaceName(oneIf2.Id.IfIndex): oneIf2.Id.Direction,
159+
},
160+
{
161+
utils.GetInterfaceName(oneIf1.Id.IfIndex): oneIf1.Id.Direction,
162+
},
163+
}
164+
165+
for k, v := range oneIf2.DupList {
166+
assert.Equal(t, expectedMap[k], v)
167+
}
154168
}
155169

156170
type timerMock struct {

pkg/utils/utils.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,11 @@ func utsnameStr[T int8 | uint8](in []T) string {
9191
}
9292
return string(out)
9393
}
94+
95+
func GetInterfaceName(ifIndex uint32) string {
96+
iface, err := net.InterfaceByIndex(int(ifIndex))
97+
if err != nil {
98+
return ""
99+
}
100+
return iface.Name
101+
}

0 commit comments

Comments
 (0)