Skip to content

Commit ccb7add

Browse files
author
Mario Macias
authored
NETOBSERV-619: mark duplicate flows (#66)
* NETOBSERV-619: mark duplicate flows * also dedupe flows with same interface, different direction * revert direction deduplication
1 parent f684bc1 commit ccb7add

File tree

15 files changed

+439
-58
lines changed

15 files changed

+439
-58
lines changed

docs/architecture.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ For more info on each component, please check their corresponding Go docs.
99

1010
```mermaid
1111
flowchart TD
12-
1312
E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer)
14-
E <--> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer)
13+
style E fill:#990
14+
15+
E --> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer)
1516
RB --> |chan *flow.Record| ACC(flow.Accounter)
1617
ACC --> |"chan []*flow.Record"| DD(flow.Deduper)
1718
M --> |"chan []*flow.Record"| DD

docs/config.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ The following environment variables are available to configure the NetObserv eBF
2929
deduplicator. After a flow hasn't been received for that expiry time, the deduplicator forgets it.
3030
That means that a flow from a connection that has been inactive during that period could be
3131
forwarded again from a different interface.
32+
* `DEDUPER_JUST_MARK` (default: `false`) will mark duplicates (adding an extra boolean field)
33+
instead of dropping them.
3234
* `DIRECTION` (default: `both`). Allows selecting which flows to trace according to its direction.
3335
Accepted values are `ingress`, `egress` or `both`.
3436
* `LOG_LEVEL` (default: `info`). From more to less verbose: `trace`, `debug`, `info`, `warn`,

pkg/agent/agent.go

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"time"
99

10+
"github.com/cilium/ebpf/ringbuf"
1011
"github.com/gavv/monotime"
1112
"github.com/netobserv/gopipes/pkg/node"
1213
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
@@ -20,26 +21,60 @@ import (
2021

2122
var alog = logrus.WithField("component", "agent.Flows")
2223

24+
// Status of the agent service. Helps on the health report as well as making some asynchronous
25+
// tests waiting for the agent to accept flows.
26+
type Status int
27+
28+
const (
29+
StatusNotStarted Status = iota
30+
StatusStarting
31+
StatusStarted
32+
StatusStopping
33+
StatusStopped
34+
)
35+
36+
func (s Status) String() string {
37+
switch s {
38+
case StatusNotStarted:
39+
return "StatusNotStarted"
40+
case StatusStarting:
41+
return "StatusStarting"
42+
case StatusStarted:
43+
return "StatusStarted"
44+
case StatusStopping:
45+
return "StatusStopping"
46+
case StatusStopped:
47+
return "StatusStopped"
48+
default:
49+
return "invalid"
50+
}
51+
}
52+
2353
// Flows reporting agent
2454
type Flows struct {
2555
cfg *Config
2656

2757
// input data providers
2858
interfaces ifaces.Informer
2959
filter interfaceFilter
30-
ebpf ebpfRegisterer
60+
ebpf ebpfFlowFetcher
3161

3262
// processing nodes to be wired in the buildAndStartPipeline method
3363
mapTracer *flow.MapTracer
3464
rbTracer *flow.RingBufTracer
3565
accounter *flow.Accounter
3666
exporter flowExporter
67+
68+
status Status
3769
}
3870

39-
// ebpfRegisterer abstracts the interface of ebpf.FlowFetcher to allow dependency injection in tests
40-
type ebpfRegisterer interface {
71+
// ebpfFlowFetcher abstracts the interface of ebpf.FlowFetcher to allow dependency injection in tests
72+
type ebpfFlowFetcher interface {
4173
io.Closer
4274
Register(iface ifaces.Interface) error
75+
76+
LookupAndDeleteMap() map[flow.RecordKey][]flow.RecordMetrics
77+
ReadRingBuf() (ringbuf.Record, error)
4378
}
4479

4580
// flowExporter abstract the ExportFlows' method of exporter.GRPCProto to allow dependency injection
@@ -50,12 +85,6 @@ type flowExporter func(in <-chan []*flow.Record)
5085
func FlowsAgent(cfg *Config) (*Flows, error) {
5186
alog.Info("initializing Flows agent")
5287

53-
// configure allow/deny interfaces filter
54-
filter, err := initInterfaceFilter(cfg.Interfaces, cfg.ExcludeInterfaces)
55-
if err != nil {
56-
return nil, fmt.Errorf("configuring interface filters: %w", err)
57-
}
58-
5988
// configure informer for new interfaces
6089
var informer ifaces.Informer
6190
switch cfg.ListenInterfaces {
@@ -71,22 +100,13 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
71100
Warn("wrong interface listen method. Using file watcher as default")
72101
informer = ifaces.NewWatcher(cfg.BuffersLength)
73102
}
74-
registerer := ifaces.NewRegisterer(informer, cfg.BuffersLength)
75103

76104
// configure selected exporter
77105
exportFunc, err := buildFlowExporter(cfg)
78106
if err != nil {
79107
return nil, err
80108
}
81109

82-
interfaceNamer := func(ifIndex int) string {
83-
iface, ok := registerer.IfaceNameForIndex(ifIndex)
84-
if !ok {
85-
return "unknown"
86-
}
87-
return iface
88-
}
89-
90110
ingress, egress := flowDirections(cfg)
91111

92112
debug := false
@@ -99,14 +119,39 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
99119
return nil, err
100120
}
101121

122+
return flowsAgent(cfg, informer, fetcher, exportFunc)
123+
}
124+
125+
// flowsAgent is a private constructor with injectable dependencies, usable for tests
126+
func flowsAgent(cfg *Config,
127+
informer ifaces.Informer,
128+
fetcher ebpfFlowFetcher,
129+
exporter flowExporter,
130+
) (*Flows, error) {
131+
// configure allow/deny interfaces filter
132+
filter, err := initInterfaceFilter(cfg.Interfaces, cfg.ExcludeInterfaces)
133+
if err != nil {
134+
return nil, fmt.Errorf("configuring interface filters: %w", err)
135+
}
136+
137+
registerer := ifaces.NewRegisterer(informer, cfg.BuffersLength)
138+
139+
interfaceNamer := func(ifIndex int) string {
140+
iface, ok := registerer.IfaceNameForIndex(ifIndex)
141+
if !ok {
142+
return "unknown"
143+
}
144+
return iface
145+
}
146+
102147
mapTracer := flow.NewMapTracer(fetcher, interfaceNamer, cfg.CacheActiveTimeout)
103148
rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout)
104149
accounter := flow.NewAccounter(
105150
cfg.CacheMaxFlows, cfg.CacheActiveTimeout, interfaceNamer, time.Now, monotime.Now)
106151
return &Flows{
107152
ebpf: fetcher,
108-
exporter: exportFunc,
109-
interfaces: informer,
153+
exporter: exporter,
154+
interfaces: registerer,
110155
filter: filter,
111156
cfg: cfg,
112157
mapTracer: mapTracer,
@@ -191,15 +236,18 @@ func buildFlowExporter(cfg *Config) (flowExporter, error) {
191236
// Run a Flows agent. The function will keep running in the same thread
192237
// until the passed context is canceled
193238
func (f *Flows) Run(ctx context.Context) error {
239+
f.status = StatusStarting
194240
alog.Info("starting Flows agent")
195241
graph, err := f.buildAndStartPipeline(ctx)
196242
if err != nil {
197243
return fmt.Errorf("starting processing graph: %w", err)
198244
}
199245

246+
f.status = StatusStarted
200247
alog.Info("Flows agent successfully started")
201248
<-ctx.Done()
202249

250+
f.status = StatusStopping
203251
alog.Info("stopping Flows agent")
204252
if err := f.ebpf.Close(); err != nil {
205253
alog.WithError(err).Warn("eBPF resources not correctly closed")
@@ -208,10 +256,15 @@ func (f *Flows) Run(ctx context.Context) error {
208256
alog.Debug("waiting for all nodes to finish their pending work")
209257
<-graph.Done()
210258

259+
f.status = StatusStopped
211260
alog.Info("Flows agent stopped")
212261
return nil
213262
}
214263

264+
func (f *Flows) Status() Status {
265+
return f.status
266+
}
267+
215268
// interfacesManager uses an informer to check new/deleted network interfaces. For each running
216269
// interface, it registers a flow ebpfFetcher that will forward new flows to the returned channel
217270
// TODO: consider move this method and "onInterfaceAdded" to another type
@@ -279,7 +332,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro
279332
rbTracer.SendsTo(accounter)
280333

281334
if f.cfg.Deduper == DeduperFirstCome {
282-
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry),
335+
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark),
283336
node.ChannelBufferLen(f.cfg.BuffersLength))
284337
mapTracer.SendsTo(deduper)
285338
accounter.SendsTo(deduper)

0 commit comments

Comments
 (0)