@@ -4,8 +4,10 @@ import (
44 "context"
55 "errors"
66 "fmt"
7+ "io"
78 "time"
89
10+ "github.com/gavv/monotime"
911 "github.com/netobserv/gopipes/pkg/node"
1012 "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
1113 "github.com/netobserv/netobserv-ebpf-agent/pkg/exporter"
@@ -20,16 +22,23 @@ var alog = logrus.WithField("component", "agent.Flows")
2022
2123// Flows reporting agent
2224type Flows struct {
23- exporter flowExporter
25+ cfg * Config
26+
27+ // input data providers
2428 interfaces ifaces.Informer
2529 filter interfaceFilter
26- tracer flowTracer
27- cfg * Config
30+ ebpf ebpfRegisterer
31+
32+ // processing nodes to be wired in the buildAndStartPipeline method
33+ mapTracer * flow.MapTracer
34+ rbTracer * flow.RingBufTracer
35+ accounter * flow.Accounter
36+ exporter flowExporter
2837}
2938
30- // flowTracer abstracts the interface of ebpf.FlowTracer to allow dependency injection in tests
31- type flowTracer interface {
32- Trace ( ctx context. Context , forwardFlows chan <- [] * flow. Record )
39+ // ebpfRegisterer abstracts the interface of ebpf.FlowFetcher to allow dependency injection in tests
40+ type ebpfRegisterer interface {
41+ io. Closer
3342 Register (iface ifaces.Interface ) error
3443}
3544
@@ -65,7 +74,62 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
6574 registerer := ifaces .NewRegisterer (informer , cfg .BuffersLength )
6675
6776 // configure selected exporter
68- var exportFunc flowExporter
77+ exportFunc , err := buildFlowExporter (cfg )
78+ if err != nil {
79+ return nil , err
80+ }
81+
82+ interfaceNamer := func (ifIndex int ) string {
83+ iface , ok := registerer .IfaceNameForIndex (ifIndex )
84+ if ! ok {
85+ return "unknown"
86+ }
87+ return iface
88+ }
89+
90+ ingress , egress := flowDirections (cfg )
91+
92+ debug := false
93+ if cfg .LogLevel == logrus .TraceLevel .String () || cfg .LogLevel == logrus .DebugLevel .String () {
94+ debug = true
95+ }
96+
97+ fetcher , err := ebpf .NewFlowFetcher (debug , cfg .Sampling , cfg .CacheMaxFlows , ingress , egress )
98+ if err != nil {
99+ return nil , err
100+ }
101+
102+ mapTracer := flow .NewMapTracer (fetcher , interfaceNamer , cfg .CacheActiveTimeout )
103+ rbTracer := flow .NewRingBufTracer (fetcher , mapTracer , cfg .CacheActiveTimeout )
104+ accounter := flow .NewAccounter (
105+ cfg .CacheMaxFlows , cfg .CacheActiveTimeout , interfaceNamer , time .Now , monotime .Now )
106+ return & Flows {
107+ ebpf : fetcher ,
108+ exporter : exportFunc ,
109+ interfaces : informer ,
110+ filter : filter ,
111+ cfg : cfg ,
112+ mapTracer : mapTracer ,
113+ rbTracer : rbTracer ,
114+ accounter : accounter ,
115+ }, nil
116+ }
117+
118+ func flowDirections (cfg * Config ) (ingress , egress bool ) {
119+ switch cfg .Direction {
120+ case DirectionIngress :
121+ return true , false
122+ case DirectionEgress :
123+ return false , true
124+ case DirectionBoth :
125+ return true , true
126+ default :
127+ alog .Warnf ("unknown DIRECTION %q. Tracing both ingress and egress traffic" , cfg .Direction )
128+ return true , true
129+ }
130+ }
131+
132+ func buildFlowExporter (cfg * Config ) (flowExporter , error ) {
69133 switch cfg .Export {
70134 case "grpc" :
71135 if cfg .TargetHost == "" || cfg .TargetPort == 0 {
@@ -77,7 +141,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
77141 if err != nil {
78142 return nil , err
79143 }
80- exportFunc = grpcExporter .ExportFlows
144+ return grpcExporter .ExportFlows , nil
81145 case "kafka" :
82146 if len (cfg .KafkaBrokers ) == 0 {
83147 return nil , errors .New ("at least one Kafka broker is needed" )
@@ -95,7 +159,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
95159 }
96160 transport .TLS = tlsConfig
97161 }
98- exportFunc = (& exporter.KafkaProto {
162+ return (& exporter.KafkaProto {
99163 Writer : & kafkago.Writer {
100164 Addr : kafkago .TCP (cfg .KafkaBrokers ... ),
101165 Topic : cfg .KafkaTopic ,
@@ -117,71 +181,29 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
117181 Transport : & transport ,
118182 Balancer : & kafkago.RoundRobin {},
119183 },
120- }).ExportFlows
184+ }).ExportFlows , nil
121185 default :
122186 return nil , fmt .Errorf ("wrong export type %s. Admitted values are grpc, kafka" , cfg .Export )
123187 }
124188
125- interfaceNamer := func (ifIndex int ) string {
126- iface , ok := registerer .IfaceNameForIndex (ifIndex )
127- if ! ok {
128- return "unknown"
129- }
130- return iface
131- }
132-
133- ingress , egress := flowDirections (cfg )
134-
135- debug := false
136- if cfg .LogLevel == logrus .TraceLevel .String () || cfg .LogLevel == logrus .DebugLevel .String () {
137- debug = true
138- }
139-
140- tracer , err := ebpf .NewFlowTracer (
141- debug ,
142- cfg .Sampling , cfg .CacheMaxFlows , cfg .BuffersLength , cfg .CacheActiveTimeout ,
143- ingress , egress ,
144- interfaceNamer ,
145- )
146- if err != nil {
147- return nil , err
148- }
149-
150- return & Flows {
151- tracer : tracer ,
152- exporter : exportFunc ,
153- interfaces : informer ,
154- filter : filter ,
155- cfg : cfg ,
156- }, nil
157- }
158-
159- func flowDirections (cfg * Config ) (ingress , egress bool ) {
160- switch cfg .Direction {
161- case DirectionIngress :
162- return true , false
163- case DirectionEgress :
164- return false , true
165- case DirectionBoth :
166- return true , true
167- default :
168- alog .Warnf ("unknown DIRECTION %q. Tracing both ingress and egress traffic" , cfg .Direction )
169- return true , true
170- }
171189}
172190
173191// Run a Flows agent. The function will keep running in the same thread
174192// until the passed context is canceled
175193func (f * Flows ) Run (ctx context.Context ) error {
176194 alog .Info ("starting Flows agent" )
177- graph , err := f .processPipeline (ctx )
195+ graph , err := f .buildAndStartPipeline (ctx )
178196 if err != nil {
179197 return fmt .Errorf ("starting processing graph: %w" , err )
180198 }
181199
182200 alog .Info ("Flows agent successfully started" )
183201 <- ctx .Done ()
202+
184203 alog .Info ("stopping Flows agent" )
204+ if err := f .ebpf .Close (); err != nil {
205+ alog .WithError (err ).Warn ("eBPF resources not correctly closed" )
206+ }
185207
186208 alog .Debug ("waiting for all nodes to finish their pending work" )
187209 <- graph .Done ()
@@ -191,24 +213,22 @@ func (f *Flows) Run(ctx context.Context) error {
191213}
192214
193215// interfacesManager uses an informer to check new/deleted network interfaces. For each running
194- // interface, it registers a flow tracer that will forward new flows to the returned channel
195- func (f * Flows ) interfacesManager (ctx context.Context ) (node.InitFunc , error ) {
216+ // interface, it registers a flow ebpfFetcher that will forward new flows to the returned channel
217+ // TODO: consider move this method and "onInterfaceAdded" to another type
218+ func (f * Flows ) interfacesManager (ctx context.Context ) error {
196219 slog := alog .WithField ("function" , "interfacesManager" )
197220
198221 slog .Debug ("subscribing for network interface events" )
199222 ifaceEvents , err := f .interfaces .Subscribe (ctx )
200223 if err != nil {
201- return nil , fmt .Errorf ("instantiating interfaces' informer: %w" , err )
224+ return fmt .Errorf ("instantiating interfaces' informer: %w" , err )
202225 }
203226
204- tctx , cancelTracer := context .WithCancel (ctx )
205227 go func () {
206228 for {
207229 select {
208230 case <- ctx .Done ():
209- slog .Debug ("canceling flow tracer" )
210- cancelTracer ()
211- slog .Debug ("closing channel and exiting internal goroutine" )
231+ slog .Debug ("stopping interfaces' listener" )
212232 return
213233 case event := <- ifaceEvents :
214234 slog .WithField ("event" , event ).Debug ("received event" )
@@ -217,43 +237,52 @@ func (f *Flows) interfacesManager(ctx context.Context) (node.InitFunc, error) {
217237 f .onInterfaceAdded (event .Interface )
218238 case ifaces .EventDeleted :
219239 // qdiscs, ingress and egress filters are automatically deleted so we don't need to
220- // specifically detach them from the tracer
240+ // specifically detach them from the ebpfFetcher
221241 default :
222242 slog .WithField ("event" , event ).Warn ("unknown event type" )
223243 }
224244 }
225245 }
226246 }()
227247
228- return func (out chan <- []* flow.Record ) {
229- f .tracer .Trace (tctx , out )
230- }, nil
248+ return nil
231249}
232250
233- // processPipeline creates the tracers --> accounter --> forwarder Flow processing graph
234- func (f * Flows ) processPipeline (ctx context.Context ) (* node.Terminal , error ) {
251+ // buildAndStartPipeline creates the ETL flow processing graph.
252+ // For a more visual view, check the docs/architecture.md document.
253+ func (f * Flows ) buildAndStartPipeline (ctx context.Context ) (* node.Terminal , error ) {
235254
236- alog .Debug ("registering tracers' input " )
237- tracedRecords , err := f .interfacesManager (ctx )
255+ alog .Debug ("registering interfaces' listener in background " )
256+ err := f .interfacesManager (ctx )
238257 if err != nil {
239258 return nil , err
240259 }
241- tracersCollector := node .AsInit (tracedRecords )
242260
243- alog .Debug ("registering exporter" )
261+ alog .Debug ("connecting flows' processing graph" )
262+ mapTracer := node .AsInit (f .mapTracer .TraceLoop (ctx ))
263+ rbTracer := node .AsInit (f .rbTracer .TraceLoop (ctx ))
264+
265+ accounter := node .AsMiddle (f .accounter .Account ,
266+ node .ChannelBufferLen (f .cfg .BuffersLength ))
267+
244268 export := node .AsTerminal (f .exporter ,
245269 node .ChannelBufferLen (f .cfg .BuffersLength ))
246- alog .Debug ("connecting graph" )
270+
271+ rbTracer .SendsTo (accounter )
272+
247273 if f .cfg .Deduper == DeduperFirstCome {
248274 deduper := node .AsMiddle (flow .Dedupe (f .cfg .DeduperFCExpiry ),
249275 node .ChannelBufferLen (f .cfg .BuffersLength ))
250- tracersCollector .SendsTo (deduper )
276+ mapTracer .SendsTo (deduper )
277+ accounter .SendsTo (deduper )
251278 deduper .SendsTo (export )
252279 } else {
253- tracersCollector .SendsTo (export )
280+ mapTracer .SendsTo (export )
281+ accounter .SendsTo (export )
254282 }
255283 alog .Debug ("starting graph" )
256- tracersCollector .Start ()
284+ mapTracer .Start ()
285+ rbTracer .Start ()
257286 return export , nil
258287}
259288
@@ -264,10 +293,10 @@ func (f *Flows) onInterfaceAdded(iface ifaces.Interface) {
264293 Debug ("interface does not match the allow/exclusion filters. Ignoring" )
265294 return
266295 }
267- alog .WithField ("interface" , iface ).Info ("interface detected. Registering flow tracer " )
268- if err := f .tracer .Register (iface ); err != nil {
296+ alog .WithField ("interface" , iface ).Info ("interface detected. Registering flow ebpfFetcher " )
297+ if err := f .ebpf .Register (iface ); err != nil {
269298 alog .WithField ("interface" , iface ).WithError (err ).
270- Warn ("can't register flow tracer . Ignoring" )
299+ Warn ("can't register flow ebpfFetcher . Ignoring" )
271300 return
272301 }
273302}
0 commit comments