@@ -3,110 +3,227 @@ package agent
33import (
44 "context"
55 "fmt"
6- "net "
6+ "sync "
77
88 "github.com/netobserv/gopipes/pkg/node"
99 "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
1010 "github.com/netobserv/netobserv-ebpf-agent/pkg/exporter"
1111 "github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
12+ "github.com/netobserv/netobserv-ebpf-agent/pkg/ifaces"
1213 "github.com/sirupsen/logrus"
1314)
1415
1516var alog = logrus .WithField ("component" , "agent.Flows" )
1617
1718// Flows reporting agent
1819type Flows struct {
19- tracers map [string ]flowTracer
20- accounter flowAccounter
21- exporter flowExporter
20+ // trMutex provides synchronized access to the tracers map
21+ trMutex sync.Mutex
22+ // tracers stores a flowTracer implementation for each interface in the system, with a
23+ // cancel function that allows stopping it when its interface is deleted
24+ tracers map [ifaces.Name ]cancellableTracer
25+ accounter * flow.Accounter
26+ exporter flowExporter
27+ interfaces ifaces.Informer
28+ filter interfaceFilter
29+ // tracerFactory specifies how to instantiate flowTracer implementations
30+ tracerFactory func (name string , sampling uint32 ) flowTracer
31+ cfg * Config
2232}
2333
34+ // flowTracer abstracts the interface of ebpf.FlowTracer to allow dependency injection in tests
2435type flowTracer interface {
2536 Trace (ctx context.Context , forwardFlows chan <- * flow.Record )
2637 Register () error
2738 Unregister () error
2839}
2940
30- type flowAccounter interface {
31- Account (in <- chan * flow.Record , out chan <- []* flow.Record )
41+ type cancellableTracer struct {
42+ tracer flowTracer
43+ cancel context.CancelFunc
3244}
3345
46+ // flowExporter abstract the ExportFlows' method of exporter.GRPCProto to allow dependency injection
47+ // in tests
3448type flowExporter func (in <- chan []* flow.Record )
3549
3650// FlowsAgent instantiates a new agent, given a configuration.
3751func FlowsAgent (cfg * Config ) (* Flows , error ) {
3852 alog .Info ("initializing Flows agent" )
39- interfaces , err := getInterfaces (cfg , net .Interfaces )
53+
54+ // configure allow/deny interfaces filter
55+ filter , err := initInterfaceFilter (cfg .Interfaces , cfg .ExcludeInterfaces )
4056 if err != nil {
41- return nil , err
57+ return nil , fmt . Errorf ( "configuring interface filters: %w" , err )
4258 }
43- tracers := map [string ]flowTracer {}
44- for iface := range interfaces {
45- tracers [iface ] = ebpf .NewFlowTracer (iface , cfg .Sampling )
59+
60+ // configure informer for new interfaces
61+ var informer ifaces.Informer
62+ switch cfg .ListenInterfaces {
63+ case ListenPoll :
64+ alog .WithField ("period" , cfg .ListenPollPeriod ).
65+ Debug ("listening for new interfaces: use polling" )
66+ informer = ifaces .NewPoller (cfg .ListenPollPeriod , cfg .BuffersLength )
67+ case ListenWatch :
68+ alog .Debug ("listening for new interfaces: use watching" )
69+ informer = ifaces .NewWatcher (cfg .BuffersLength )
70+ default :
71+ alog .WithField ("providedValue" , cfg .ListenInterfaces ).
72+ Warn ("wrong interface listen method. Using file watcher as default" )
73+ informer = ifaces .NewWatcher (cfg .BuffersLength )
4674 }
75+
76+ // configure GRPC+Protobuf exporter
4777 target := fmt .Sprintf ("%s:%d" , cfg .TargetHost , cfg .TargetPort )
4878 grpcExporter , err := exporter .StartGRPCProto (target )
4979 if err != nil {
5080 return nil , err
5181 }
82+
5283 return & Flows {
53- tracers : tracers ,
54- accounter : flow .NewAccounter (cfg .CacheMaxFlows ,
55- cfg .BuffersLength ,
56- cfg .CacheActiveTimeout ),
57- exporter : grpcExporter .ExportFlows ,
84+ tracers : map [ifaces.Name ]cancellableTracer {},
85+ accounter : flow .NewAccounter (cfg .CacheMaxFlows , cfg .BuffersLength , cfg .CacheActiveTimeout ),
86+ exporter : grpcExporter .ExportFlows ,
87+ interfaces : informer ,
88+ filter : filter ,
89+ tracerFactory : func (name string , sampling uint32 ) flowTracer {
90+ return ebpf .NewFlowTracer (name , sampling )
91+ },
92+ cfg : cfg ,
5893 }, nil
5994}
6095
6196// Run a Flows agent. The function will keep running in the same thread
6297// until the passed context is canceled
6398func (f * Flows ) Run (ctx context.Context ) error {
6499 alog .Info ("starting Flows agent" )
65- alog .Debug ("registering flow tracers" )
66- var tracers []* node.Init
67- for i , t := range f .tracers {
68- // make sure the background/deferred functions use this loop's values
69- iface , tracer := i , t
70- tlog := alog .WithField ("iface" , iface )
71- tlog .Debug ("registering flow tracer" )
72- if err := tracer .Register (); err != nil {
73- return err
74- }
75- defer func () {
76- tlog .Debug ("unregistering flow tracer" )
77- if err := tracer .Unregister (); err != nil {
78- tlog .WithError (err ).Warn ("error unregistering flow tracer" )
79- }
80- }()
81- tracers = append (tracers ,
82- node .AsInit (func (out chan <- * flow.Record ) {
83- tracer .Trace (ctx , out )
84- tlog .Debug ("tracer routine ended" )
85- }))
86- }
87- alog .Debug ("registering accounter" )
88- accounter := node .AsMiddle (f .accounter .Account )
89- alog .Debug ("registering exporter" )
90- exporter := node .AsTerminal (f .exporter )
91100
92- alog .Debug ("connecting graph" )
93- for _ , t := range tracers {
94- t .SendsTo (accounter )
95- }
96- accounter .SendsTo (exporter )
97-
98- alog .Debug ("starting graph" )
99- for _ , t := range tracers {
100- t .Start ()
101+ tracedRecords , err := f .interfacesManager (ctx )
102+ if err != nil {
103+ return err
101104 }
105+ graph := f .processRecords (tracedRecords )
102106
103107 alog .Info ("Flows agent successfully started" )
104108 <- ctx .Done ()
105109 alog .Info ("stopping Flows agent" )
106110
107111 alog .Debug ("waiting for all nodes to finish their pending work" )
108- <- exporter .Done ()
112+ <- graph .Done ()
109113
110114 alog .Info ("Flows agent stopped" )
111115 return nil
112116}
117+
118+ // interfacesManager uses an informer to check new/deleted network interfaces. For each running
119+ // interface, it registers a flow tracer that will forward new flows to the returned channel
120+ func (f * Flows ) interfacesManager (ctx context.Context ) (<- chan * flow.Record , error ) {
121+ slog := alog .WithField ("function" , "interfacesManager" )
122+
123+ slog .Debug ("subscribing for network interface events" )
124+ ifaceEvents , err := f .interfaces .Subscribe (ctx )
125+ if err != nil {
126+ return nil , fmt .Errorf ("instantiating interfaces' informer: %w" , err )
127+ }
128+
129+ tracedRecords := make (chan * flow.Record , f .cfg .BuffersLength )
130+ go func () {
131+ for {
132+ select {
133+ case <- ctx .Done ():
134+ slog .Debug ("detaching all the flow tracers before closing the records' channel" )
135+ f .detachAllTracers ()
136+ slog .Debug ("closing channel and exiting internal goroutine" )
137+ close (tracedRecords )
138+ return
139+ case event := <- ifaceEvents :
140+ slog .WithField ("event" , event ).Debug ("received event" )
141+ switch event .Type {
142+ case ifaces .EventAdded :
143+ f .onInterfaceAdded (ctx , event .Interface , tracedRecords )
144+ case ifaces .EventDeleted :
145+ f .onInterfaceDeleted (event .Interface )
146+ default :
147+ slog .WithField ("event" , event ).Warn ("unknown event type" )
148+ }
149+ }
150+ }
151+ }()
152+
153+ return tracedRecords , nil
154+ }
155+
156+ // processRecords creates the tracers --> accounter --> forwarder Flow processing graph
157+ func (f * Flows ) processRecords (tracedRecords <- chan * flow.Record ) * node.Terminal {
158+ // The start node receives Records from the eBPF flow tracers. Currently it is just an external
159+ // channel forwarder, as the Pipes library does not yet accept
160+ // adding/removing nodes dynamically: https://github.com/mariomac/pipes/issues/5
161+ alog .Debug ("registering tracers' input" )
162+ tracersCollector := node .AsInit (func (out chan <- * flow.Record ) {
163+ for i := range tracedRecords {
164+ out <- i
165+ }
166+ })
167+ alog .Debug ("registering accounter" )
168+ accounter := node .AsMiddle (f .accounter .Account )
169+ alog .Debug ("registering exporter" )
170+ export := node .AsTerminal (f .exporter )
171+ alog .Debug ("connecting graph" )
172+ tracersCollector .SendsTo (accounter )
173+ accounter .SendsTo (export )
174+ alog .Debug ("starting graph" )
175+ tracersCollector .Start ()
176+ return export
177+ }
178+
179+ func (f * Flows ) onInterfaceAdded (ctx context.Context , name ifaces.Name , flowsCh chan * flow.Record ) {
180+ // ignore interfaces that do not match the user configuration acceptance/exclusion lists
181+ if ! f .filter .Allowed (name ) {
182+ alog .WithField ("name" , name ).
183+ Debug ("interface does not match the allow/exclusion filters. Ignoring" )
184+ return
185+ }
186+ f .trMutex .Lock ()
187+ defer f .trMutex .Unlock ()
188+ if _ , ok := f .tracers [name ]; ! ok {
189+ alog .WithField ("name" , name ).Info ("interface detected. Registering flow tracer" )
190+ tracer := f .tracerFactory (string (name ), f .cfg .Sampling )
191+ if err := tracer .Register (); err != nil {
192+ alog .WithField ("interface" , name ).WithError (err ).
193+ Warn ("can't register flow tracer. Ignoring" )
194+ return
195+ }
196+ tctx , cancel := context .WithCancel (ctx )
197+ go tracer .Trace (tctx , flowsCh )
198+ f .tracers [name ] = cancellableTracer {
199+ tracer : tracer ,
200+ cancel : cancel ,
201+ }
202+ }
203+ }
204+
205+ func (f * Flows ) onInterfaceDeleted (name ifaces.Name ) {
206+ f .trMutex .Lock ()
207+ defer f .trMutex .Unlock ()
208+ if ft , ok := f .tracers [name ]; ok {
209+ alog .WithField ("name" , name ).Info ("interface deleted. Removing flow tracer" )
210+ ft .cancel ()
211+ delete (f .tracers , name )
212+ // qdiscs, ingress and egress filters are automatically deleted so we don't need to
213+ // specifically detach the tracer
214+ }
215+ }
216+
217+ func (f * Flows ) detachAllTracers () {
218+ f .trMutex .Lock ()
219+ defer f .trMutex .Unlock ()
220+ for name , ft := range f .tracers {
221+ ft .cancel ()
222+ flog := alog .WithField ("name" , name )
223+ flog .Info ("unregistering flow tracer" )
224+ if err := ft .tracer .Unregister (); err != nil {
225+ flog .WithError (err ).Warn ("can't unregister flow tracer" )
226+ }
227+ }
228+ f .tracers = map [ifaces.Name ]cancellableTracer {}
229+ }
0 commit comments