Skip to content

Commit f6043d9

Browse files
author
Mario Macias
authored
Code improvements (#60)
1 parent c4c9bf4 commit f6043d9

File tree

8 files changed

+98
-41
lines changed

8 files changed

+98
-41
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/cilium/ebpf v0.8.1
88
github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424
99
github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f
10-
github.com/netobserv/gopipes v0.1.1
10+
github.com/netobserv/gopipes v0.2.0
1111
github.com/paulbellamy/ratecounter v0.2.0
1212
github.com/segmentio/kafka-go v0.4.32
1313
github.com/sirupsen/logrus v1.8.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,8 +372,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m
372372
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
373373
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
374374
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
375-
github.com/netobserv/gopipes v0.1.1 h1:f8zJsvnMgRFRa2B+1siwRtW0Y4dqeBROmkcI/HgT1gE=
376-
github.com/netobserv/gopipes v0.1.1/go.mod h1:eGoHZW1ON8Dx/zmDXUhsbVNqatPjtpdO0UZBmGZGmVI=
375+
github.com/netobserv/gopipes v0.2.0 h1:CnJQq32+xNuM85eVYy/HOf+StTJdh2K6RdaEg7NAJDg=
376+
github.com/netobserv/gopipes v0.2.0/go.mod h1:eGoHZW1ON8Dx/zmDXUhsbVNqatPjtpdO0UZBmGZGmVI=
377377
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
378378
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
379379
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=

pkg/agent/agent.go

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,10 @@ func flowDirections(cfg *Config) (ingress, egress bool) {
168168
// until the passed context is canceled
169169
func (f *Flows) Run(ctx context.Context) error {
170170
alog.Info("starting Flows agent")
171-
tracedRecords, err := f.interfacesManager(ctx)
171+
graph, err := f.processPipeline(ctx)
172172
if err != nil {
173-
return err
173+
return fmt.Errorf("starting processing graph: %w", err)
174174
}
175-
graph := f.processRecords(tracedRecords)
176175

177176
alog.Info("Flows agent successfully started")
178177
<-ctx.Done()
@@ -187,7 +186,7 @@ func (f *Flows) Run(ctx context.Context) error {
187186

188187
// interfacesManager uses an informer to check new/deleted network interfaces. For each running
189188
// interface, it registers a flow tracer that will forward new flows to the returned channel
190-
func (f *Flows) interfacesManager(ctx context.Context) (<-chan []*flow.Record, error) {
189+
func (f *Flows) interfacesManager(ctx context.Context) (node.InitFunc, error) {
191190
slog := alog.WithField("function", "interfacesManager")
192191

193192
slog.Debug("subscribing for network interface events")
@@ -196,19 +195,14 @@ func (f *Flows) interfacesManager(ctx context.Context) (<-chan []*flow.Record, e
196195
return nil, fmt.Errorf("instantiating interfaces' informer: %w", err)
197196
}
198197

199-
tracedRecords := make(chan []*flow.Record, f.cfg.BuffersLength)
200-
201198
tctx, cancelTracer := context.WithCancel(ctx)
202-
go f.tracer.Trace(tctx, tracedRecords)
203-
204199
go func() {
205200
for {
206201
select {
207202
case <-ctx.Done():
208203
slog.Debug("canceling flow tracer")
209204
cancelTracer()
210205
slog.Debug("closing channel and exiting internal goroutine")
211-
close(tracedRecords)
212206
return
213207
case event := <-ifaceEvents:
214208
slog.WithField("event", event).Debug("received event")
@@ -225,33 +219,36 @@ func (f *Flows) interfacesManager(ctx context.Context) (<-chan []*flow.Record, e
225219
}
226220
}()
227221

228-
return tracedRecords, nil
222+
return func(out chan<- []*flow.Record) {
223+
f.tracer.Trace(tctx, out)
224+
}, nil
229225
}
230226

231-
// processRecords creates the tracers --> accounter --> forwarder Flow processing graph
232-
func (f *Flows) processRecords(tracedRecords <-chan []*flow.Record) *node.Terminal {
233-
// The start node receives Records from the eBPF flow tracers. Currently it is just an external
234-
// channel forwarder, as the Pipes library does not yet accept
235-
// adding/removing nodes dynamically: https://github.com/mariomac/pipes/issues/5
227+
// processPipeline creates the tracers --> accounter --> forwarder Flow processing graph
228+
func (f *Flows) processPipeline(ctx context.Context) (*node.Terminal, error) {
229+
236230
alog.Debug("registering tracers' input")
237-
tracersCollector := node.AsInit(func(out chan<- []*flow.Record) {
238-
for i := range tracedRecords {
239-
out <- i
240-
}
241-
})
231+
tracedRecords, err := f.interfacesManager(ctx)
232+
if err != nil {
233+
return nil, err
234+
}
235+
tracersCollector := node.AsInit(tracedRecords)
236+
242237
alog.Debug("registering exporter")
243-
export := node.AsTerminal(f.exporter)
238+
export := node.AsTerminal(f.exporter,
239+
node.ChannelBufferLen(f.cfg.BuffersLength))
244240
alog.Debug("connecting graph")
245241
if f.cfg.Deduper == DeduperFirstCome {
246-
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry))
242+
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry),
243+
node.ChannelBufferLen(f.cfg.BuffersLength))
247244
tracersCollector.SendsTo(deduper)
248245
deduper.SendsTo(export)
249246
} else {
250247
tracersCollector.SendsTo(export)
251248
}
252249
alog.Debug("starting graph")
253250
tracersCollector.Start()
254-
return export
251+
return export, nil
255252
}
256253

257254
func (f *Flows) onInterfaceAdded(iface ifaces.Interface) {

pkg/ebpf/tracer.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"io/fs"
99
"strings"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
"github.com/cilium/ebpf/ringbuf"
@@ -296,10 +297,13 @@ func (m *FlowTracer) pollAndForwardAggregateFlows(ctx context.Context, forwardFl
296297
<-ctx.Done()
297298
}()
298299
go func() {
300+
// flow eviction loop. It just keeps waiting for eviction until someone triggers the
301+
// flowsEvictor.Broadcast signal
299302
for {
300303
// make sure we only evict once at a time, even if there are multiple eviction signals
301304
m.flowsEvictor.L.Lock()
302305
m.flowsEvictor.Wait()
306+
tlog.Debug("eviction signal received")
303307
m.evictFlows(tlog, forwardFlows)
304308
m.flowsEvictor.L.Unlock()
305309

@@ -315,13 +319,13 @@ func (m *FlowTracer) pollAndForwardAggregateFlows(ctx context.Context, forwardFl
315319
for {
316320
select {
317321
case <-ctx.Done():
318-
tlog.Debug("evicting flows after context cancelation")
322+
tlog.Debug("triggering flow eviction after context cancelation")
319323
m.flowsEvictor.Broadcast()
320324
ticker.Stop()
321325
tlog.Debug("exiting monitor")
322326
return
323327
case <-ticker.C:
324-
tlog.Debug("evicting flows on timer")
328+
tlog.Debug("triggering flow eviction on timer")
325329
m.flowsEvictor.Broadcast()
326330
}
327331
}
@@ -372,6 +376,8 @@ func (m *FlowTracer) Trace(ctx context.Context, forwardFlows chan<- []*flow.Reco
372376
func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlows chan<- []*flow.Record) {
373377
flowAccount := make(chan *flow.RawRecord, m.buffersLength)
374378
go m.accounter.Account(flowAccount, forwardFlows)
379+
isForwarding := int32(0)
380+
forwardedFlows := int32(0)
375381
for {
376382
select {
377383
case <-ctx.Done():
@@ -393,8 +399,10 @@ func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlow
393399
log.WithError(err).Warn("reading ringbuf event")
394400
continue
395401
}
396-
log.WithField("direction", readFlow.Direction).
397-
Debug("received flow from ringbuffer. Evicting in-memory maps to leave free space")
402+
if logrus.IsLevelEnabled(logrus.DebugLevel) {
403+
m.logRingBufferFlows(&forwardedFlows, &isForwarding)
404+
}
405+
// forces a flow's eviction to leave room for new flows in the ebpf cache
398406
m.flowsEvictor.Broadcast()
399407

400408
// Will need to send it to accounter anyway to account regardless of complete/ongoing flow
@@ -403,12 +411,30 @@ func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlow
403411
}
404412
}
405413

414+
// logRingBufferFlows avoids flooding logs on long series of evicted flows by grouping how
415+
// many flows are forwarded
416+
func (m *FlowTracer) logRingBufferFlows(forwardedFlows, isForwarding *int32) {
417+
atomic.AddInt32(forwardedFlows, 1)
418+
if atomic.CompareAndSwapInt32(isForwarding, 0, 1) {
419+
go func() {
420+
time.Sleep(m.evictionTimeout)
421+
log.WithFields(logrus.Fields{
422+
"flows": atomic.LoadInt32(forwardedFlows),
423+
"cacheMaxFlows": m.cacheMaxSize,
424+
}).Debug("received flows via ringbuffer. You might want to increase the CACHE_MAX_FLOWS value")
425+
atomic.StoreInt32(forwardedFlows, 0)
426+
atomic.StoreInt32(isForwarding, 0)
427+
}()
428+
}
429+
}
430+
406431
// For synchronization purposes, we get/delete a whole snapshot of the flows map.
407432
// This way we avoid missing packets that could be updated on the
408433
// ebpf side while we process/aggregate them here
409434
// Changing this method invocation by BatchLookupAndDelete could improve performance
410435
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
411436
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
437+
// Race conditions here causes that some flows are lost in high-load scenarios
412438
func (m *FlowTracer) lookupAndDeleteFlowsMap() map[flow.RecordKey][]flow.RecordMetrics {
413439
flowMap := m.objects.AggregatedFlows
414440

pkg/flow/account.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) {
5252
}
5353
evictingEntries := c.entries
5454
c.entries = make(map[RecordKey]*RecordMetrics, c.maxEntries)
55+
logrus.WithField("flows", len(evictingEntries)).
56+
Debug("evicting flows from userspace accounter on timeout")
5557
go c.evict(evictingEntries, out)
5658
case record, ok := <-in:
5759
if !ok {
@@ -69,12 +71,13 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) {
6971
if len(c.entries) >= c.maxEntries {
7072
evictingEntries := c.entries
7173
c.entries = make(map[RecordKey]*RecordMetrics, c.maxEntries)
74+
logrus.WithField("flows", len(evictingEntries)).
75+
Debug("evicting flows from userspace accounter after reaching cache max length")
7276
go c.evict(evictingEntries, out)
7377
}
7478
c.entries[record.RecordKey] = &record.RecordMetrics
7579
}
7680
}
77-
7881
}
7982
}
8083

vendor/github.com/netobserv/gopipes/pkg/node/node.go

Lines changed: 18 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/netobserv/gopipes/pkg/node/options.go

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/modules.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ github.com/modern-go/reflect2
106106
# github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
107107
## explicit
108108
github.com/munnerz/goautoneg
109-
# github.com/netobserv/gopipes v0.1.1
109+
# github.com/netobserv/gopipes v0.2.0
110110
## explicit; go 1.17
111111
github.com/netobserv/gopipes/pkg/internal/connect
112112
github.com/netobserv/gopipes/pkg/internal/refl

0 commit comments

Comments
 (0)