Skip to content

Commit a3e5f9a

Browse files
author
Mario Macias
authored
NETOBSERV-578: deduplicate flows at agent level (#56)
* NETOBSERV-578: deduplicate flows at agent level * Addressed code comments
1 parent ec135a6 commit a3e5f9a

File tree

6 files changed

+239
-3
lines changed

6 files changed

+239
-3
lines changed

cmd/netobserv-ebpf-agent.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ func main() {
3333
Error("PProf HTTP listener stopped working")
3434
}()
3535
}
36+
if config.DeduperFCExpiry == 0 {
37+
config.DeduperFCExpiry = 2 * config.CacheActiveTimeout
38+
}
3639

3740
logrus.WithField("configuration", fmt.Sprintf("%#v", config)).Debugf("configuration loaded")
3841

docs/config.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ The following environment variables are available to configure the NetObserv eBF
2121
cache. If the accounter reaches the max number of flows, it flushes them to the collector.
2222
* `CACHE_ACTIVE_TIMEOUT` (default: `5s`). Duration string that specifies the maximum duration
2323
that flows are kept in the accounting cache before being flushed to the collector.
24+
* `DEDUPER` (default: `none`, disabled). Accepted values are `none` (disabled) and `firstCome`.
25+
When enabled, it will detect duplicate flows (flows that have been detected e.g. through
26+
both the physical and a virtual interface).
27+
`firstCome` will forward only flows from the first interface the flows are received from.
28+
* `DEDUPER_FC_EXPIRY` (default: `2 * CACHE_ACTIVE_TIMEOUT`). Specifies the expiry duration of the `firstCome`
29+
deduplicator. After a flow hasn't been received for that expiry time, the deduplicator forgets it.
30+
That means that a flow from a connection that has been inactive during that period could be
31+
forwarded again from a different interface.
2432
* `LOG_LEVEL` (default: `info`). From more to less verbose: `trace`, `debug`, `info`, `warn`,
2533
`error`, `fatal`, `panic`.
2634
* `KAFKA_BROKERS` (required if `EXPORT` is `kafka`). Comma-separated list of tha addresses of the

pkg/agent/agent.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,13 @@ func (f *Flows) processRecords(tracedRecords <-chan []*flow.Record) *node.Termin
225225
alog.Debug("registering exporter")
226226
export := node.AsTerminal(f.exporter)
227227
alog.Debug("connecting graph")
228-
tracersCollector.SendsTo(export)
228+
if f.cfg.Deduper == DeduperFirstCome {
229+
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry))
230+
tracersCollector.SendsTo(deduper)
231+
deduper.SendsTo(export)
232+
} else {
233+
tracersCollector.SendsTo(export)
234+
}
229235
alog.Debug("starting graph")
230236
tracersCollector.Start()
231237
return export

pkg/agent/config.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
)
66

77
const (
8-
ListenPoll = "poll"
9-
ListenWatch = "watch"
8+
ListenPoll = "poll"
9+
ListenWatch = "watch"
10+
DeduperNone = "none"
11+
DeduperFirstCome = "firstCome"
1012
)
1113

1214
type Config struct {
@@ -36,6 +38,17 @@ type Config struct {
3638
// CacheActiveTimeout specifies the maximum duration that flows are kept in the accounting
3739
// cache before being flushed for its later export
3840
CacheActiveTimeout time.Duration `env:"CACHE_ACTIVE_TIMEOUT" envDefault:"5s"`
41+
// Deduper specifies the deduper type. Accepted values are "none" (disabled) and "firstCome".
42+
// When enabled, it will detect duplicate flows (flows that have been detected e.g. through
43+
// both the physical and a virtual interface).
44+
// "firstCome" will forward only flows from the first interface the flows are received from.
45+
Deduper string `env:"DEDUPER" envDefault:"none"`
46+
// DeduperFCExpiry specifies the expiry duration of the flows "firstCome" deduplicator. After
47+
// a flow hasn't been received for that expiry time, the deduplicator forgets it. That means
48+
// that a flow from a connection that has been inactive during that period could be forwarded
49+
// again from a different interface.
50+
// If the value is not set, it will default to 2 * CacheActiveTimeout
51+
DeduperFCExpiry time.Duration `env:"DEDUPER_FC_EXPIRY"`
3952
// Logger level. From more to less verbose: trace, debug, info, warn, error, fatal, panic.
4053
LogLevel string `env:"LOG_LEVEL" envDefault:"info"`
4154
// Sampling holds the rate at which packets should be sampled and sent to the target collector.

pkg/flow/deduper.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package flow
2+
3+
import (
4+
"container/list"
5+
"time"
6+
7+
"github.com/sirupsen/logrus"
8+
)
9+
10+
var dlog = logrus.WithField("component", "flow/Deduper")
11+
var timeNow = time.Now
12+
13+
// deduperCache implement a LRU cache whose elements are evicted if they haven't been accessed
14+
// during the expire duration.
15+
// It is not safe for concurrent access.
16+
type deduperCache struct {
17+
expire time.Duration
18+
// key: RecordKey with the interface and MACs erased, to detect duplicates
19+
// value: listElement pointing to a struct entry
20+
ifaces map[RecordKey]*list.Element
21+
// element: entry structs of the ifaces map ordered by expiry time
22+
entries *list.List
23+
}
24+
25+
type entry struct {
26+
key *RecordKey
27+
ifIndex uint32
28+
expiryTime time.Time
29+
}
30+
31+
// Dedupe receives flows and filters these belonging to duplicate interfaces. It will forward
32+
// the flows from the first interface coming to it, until that flow expires in the cache
33+
// (no activity for it during the expiration time)
34+
func Dedupe(expireTime time.Duration) func(in <-chan []*Record, out chan<- []*Record) {
35+
cache := &deduperCache{
36+
expire: expireTime,
37+
entries: list.New(),
38+
ifaces: map[RecordKey]*list.Element{},
39+
}
40+
return func(in <-chan []*Record, out chan<- []*Record) {
41+
for records := range in {
42+
cache.removeExpired()
43+
fwd := make([]*Record, 0, len(records))
44+
for _, record := range records {
45+
if !cache.isDupe(&record.RecordKey) {
46+
fwd = append(fwd, record)
47+
}
48+
}
49+
if len(fwd) > 0 {
50+
out <- fwd
51+
}
52+
}
53+
}
54+
}
55+
56+
// isDupe returns whether the passed record has been already checked for duplicate for
57+
// another interface
58+
func (c *deduperCache) isDupe(key *RecordKey) bool {
59+
rk := *key
60+
rk.IFIndex = 0
61+
rk.DataLink = DataLink{}
62+
// If a flow has been accounted previously, whatever its interface was,
63+
// it updates the expiry time for that flow
64+
if ele, ok := c.ifaces[rk]; ok {
65+
fEntry := ele.Value.(*entry)
66+
fEntry.expiryTime = timeNow().Add(c.expire)
67+
c.entries.MoveToFront(ele)
68+
// The input flow is duplicate if its interface is different to the interface
69+
// of the non-duplicate flow that was first registered in the cache
70+
return fEntry.ifIndex != key.IFIndex
71+
}
72+
// The flow has not been accounted previously (or was forgotten after expiration)
73+
// so we register it for that concrete interface
74+
e := entry{
75+
key: &rk,
76+
ifIndex: key.IFIndex,
77+
expiryTime: timeNow().Add(c.expire),
78+
}
79+
c.ifaces[rk] = c.entries.PushFront(&e)
80+
return false
81+
}
82+
83+
func (c *deduperCache) removeExpired() {
84+
now := timeNow()
85+
ele := c.entries.Back()
86+
evicted := 0
87+
for ele != nil && now.After(ele.Value.(*entry).expiryTime) {
88+
evicted++
89+
c.entries.Remove(ele)
90+
delete(c.ifaces, *ele.Value.(*entry).key)
91+
ele = c.entries.Back()
92+
}
93+
if evicted > 0 {
94+
dlog.WithFields(logrus.Fields{
95+
"current": c.entries.Len(),
96+
"evicted": evicted,
97+
"expiryTime": c.expire,
98+
}).Debug("entries evicted from the deduper cache")
99+
}
100+
}

pkg/flow/deduper_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package flow
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
var (
11+
// the same flow from 2 different interfaces
12+
oneIf1 = &Record{RawRecord: RawRecord{RecordKey: RecordKey{
13+
EthProtocol: 1, Direction: 1, Transport: Transport{SrcPort: 123, DstPort: 456},
14+
DataLink: DataLink{DstMac: MacAddr{0x1}, SrcMac: MacAddr{0x1}}, IFIndex: 1,
15+
}, RecordMetrics: RecordMetrics{
16+
Packets: 2, Bytes: 456,
17+
}}, Interface: "eth0"}
18+
oneIf2 = &Record{RawRecord: RawRecord{RecordKey: RecordKey{
19+
EthProtocol: 1, Direction: 1, Transport: Transport{SrcPort: 123, DstPort: 456},
20+
DataLink: DataLink{DstMac: MacAddr{0x2}, SrcMac: MacAddr{0x2}}, IFIndex: 2,
21+
}, RecordMetrics: RecordMetrics{
22+
Packets: 2, Bytes: 456,
23+
}}, Interface: "123456789"}
24+
// another fow from 2 different interfaces
25+
twoIf1 = &Record{RawRecord: RawRecord{RecordKey: RecordKey{
26+
EthProtocol: 1, Direction: 1, Transport: Transport{SrcPort: 333, DstPort: 456},
27+
DataLink: DataLink{DstMac: MacAddr{0x1}, SrcMac: MacAddr{0x1}}, IFIndex: 1,
28+
}, RecordMetrics: RecordMetrics{
29+
Packets: 2, Bytes: 456,
30+
}}, Interface: "eth0"}
31+
twoIf2 = &Record{RawRecord: RawRecord{RecordKey: RecordKey{
32+
EthProtocol: 1, Direction: 1, Transport: Transport{SrcPort: 333, DstPort: 456},
33+
DataLink: DataLink{DstMac: MacAddr{0x2}, SrcMac: MacAddr{0x2}}, IFIndex: 2,
34+
}, RecordMetrics: RecordMetrics{
35+
Packets: 2, Bytes: 456,
36+
}}, Interface: "123456789"}
37+
)
38+
39+
func TestDedupe(t *testing.T) {
40+
input := make(chan []*Record, 100)
41+
output := make(chan []*Record, 100)
42+
43+
go Dedupe(time.Minute)(input, output)
44+
45+
input <- []*Record{
46+
oneIf2, // record 1 at interface 2: should be accepted
47+
twoIf1, // record 2 at interface 1: should be accepted
48+
oneIf1, // record 1 duplicate at interface 1: should NOT be accepted
49+
oneIf1, // (same record key, different interface)
50+
twoIf2, // record 2 duplicate at interface 2: should NOT be accepted
51+
oneIf2, // record 1 at interface 1: should be accepted (same record key, same interface)
52+
}
53+
deduped := receiveTimeout(t, output)
54+
assert.Equal(t, []*Record{oneIf2, twoIf1, oneIf2}, deduped)
55+
56+
// should still accept records with same key, same interface,
57+
// and discard these with same key, different interface
58+
input <- []*Record{oneIf1, oneIf2}
59+
deduped = receiveTimeout(t, output)
60+
assert.Equal(t, []*Record{oneIf2}, deduped)
61+
}
62+
63+
func TestDedupe_EvictFlows(t *testing.T) {
64+
tm := &timerMock{now: time.Now()}
65+
timeNow = tm.Now
66+
input := make(chan []*Record, 100)
67+
output := make(chan []*Record, 100)
68+
69+
go Dedupe(15*time.Second)(input, output)
70+
71+
// Should only accept records 1 and 2, at interface 1
72+
input <- []*Record{oneIf1, twoIf1, oneIf2}
73+
assert.Equal(t, []*Record{oneIf1, twoIf1},
74+
receiveTimeout(t, output))
75+
76+
tm.now = tm.now.Add(10 * time.Second)
77+
78+
// After 10 seconds, it still filters existing flows from different interfaces
79+
input <- []*Record{oneIf2}
80+
time.Sleep(100 * time.Millisecond)
81+
requireNoEviction(t, output)
82+
83+
tm.now = tm.now.Add(10 * time.Second)
84+
85+
// Record 2 hasn't been accounted for >expiryTime, so it will accept the it again
86+
// whatever the interface.
87+
// Since record 1 was accessed 10 seconds ago (<expiry time) it will filter it
88+
input <- []*Record{oneIf2, twoIf2, twoIf1}
89+
assert.Equal(t, []*Record{twoIf2},
90+
receiveTimeout(t, output))
91+
92+
tm.now = tm.now.Add(20 * time.Second)
93+
94+
// when all the records expire, the deduper is reset for that flow
95+
input <- []*Record{oneIf2, twoIf2}
96+
assert.Equal(t, []*Record{oneIf2, twoIf2},
97+
receiveTimeout(t, output))
98+
}
99+
100+
type timerMock struct {
101+
now time.Time
102+
}
103+
104+
func (tm *timerMock) Now() time.Time {
105+
return tm.now
106+
}

0 commit comments

Comments
 (0)