Skip to content

Commit 689b24d

Browse files
authored
Merge pull request #63 from mariomac/capacity-limiter
NETOBSERV-613: drop messages when they accumulate in the exporter
2 parents f63d104 + 1f06575 commit 689b24d

File tree

6 files changed

+174
-9
lines changed

6 files changed

+174
-9
lines changed

docs/architecture.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@ flowchart TD
1212
1313
E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer)
1414
E <--> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer)
15-
RB --> |*flow.Record| ACC(flow.Accounter)
16-
17-
ACC --> |"[]*flow.Record"| DD(flow.Deduper)
18-
M --> |"[]*flow.Record"| DD
15+
RB --> |chan *flow.Record| ACC(flow.Accounter)
16+
ACC --> |"chan []*flow.Record"| DD(flow.Deduper)
17+
M --> |"chan []*flow.Record"| DD
1918
2019
subgraph Optional
2120
DD
2221
end
2322
24-
DD --> |"[]*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto")
23+
DD --> |"chan []*flow.Record"| CL(flow.CapacityLimiter)
24+
25+
CL --> |"chan []*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto")
2526
```

docs/config.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ so no user should need to change them.
5959

6060
* `BUFFERS_LENGTH` (default: `50`). Length of the internal communication channels between the different
6161
processing stages.
62+
* `EXPORTER_BUFFER_LENGTH` (default: value of `BUFFERS_LENGTH`) establishes the length of the buffer
63+
of flow batches (not individual flows) that can be accumulated before the Kafka or GRPC exporter.
64+
When this buffer is full (e.g. because the Kafka or GRPC endpoint is slow), incoming flow batches
65+
will be dropped. If unset, its value is the same as the BUFFERS_LENGTH property.
6266
* `KAFKA_ASYNC` (default: `true`). If `true`, the message writing process will never block. It also
6367
means that errors are ignored since the caller will not receive the returned value.
6468
* `LISTEN_INTERFACES` (default: `watch`). Mechanism used by the agent to listen for added or removed

pkg/agent/agent.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,21 +265,30 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal, erro
265265
accounter := node.AsMiddle(f.accounter.Account,
266266
node.ChannelBufferLen(f.cfg.BuffersLength))
267267

268-
export := node.AsTerminal(f.exporter,
268+
limiter := node.AsMiddle((&flow.CapacityLimiter{}).Limit,
269269
node.ChannelBufferLen(f.cfg.BuffersLength))
270270

271+
ebl := f.cfg.ExporterBufferLength
272+
if ebl == 0 {
273+
ebl = f.cfg.BuffersLength
274+
}
275+
276+
export := node.AsTerminal(f.exporter,
277+
node.ChannelBufferLen(ebl))
278+
271279
rbTracer.SendsTo(accounter)
272280

273281
if f.cfg.Deduper == DeduperFirstCome {
274282
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry),
275283
node.ChannelBufferLen(f.cfg.BuffersLength))
276284
mapTracer.SendsTo(deduper)
277285
accounter.SendsTo(deduper)
278-
deduper.SendsTo(export)
286+
deduper.SendsTo(limiter)
279287
} else {
280-
mapTracer.SendsTo(export)
281-
accounter.SendsTo(export)
288+
mapTracer.SendsTo(limiter)
289+
accounter.SendsTo(limiter)
282290
}
291+
limiter.SendsTo(export)
283292
alog.Debug("starting graph")
284293
mapTracer.Start()
285294
rbTracer.Start()

pkg/agent/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ type Config struct {
3535
// BuffersLength establishes the length of communication channels between the different processing
3636
// stages
3737
BuffersLength int `env:"BUFFERS_LENGTH" envDefault:"50"`
38+
// ExporterBufferLength establishes the length of the buffer of flow batches (not individual flows)
39+
// that can be accumulated before the Kafka or GRPC exporter. When this buffer is full (e.g.
40+
// because the Kafka or GRPC endpoint is slow), incoming flow batches will be dropped. If unset,
41+
// its value is the same as the BUFFERS_LENGTH property.
42+
ExporterBufferLength int `env:"EXPORTER_BUFFER_LENGTH"`
3843
// CacheMaxFlows specifies how many flows can be accumulated in the accounting cache before
3944
// being flushed for its later export
4045
CacheMaxFlows int `env:"CACHE_MAX_FLOWS" envDefault:"5000"`

pkg/flow/limiter.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package flow
2+
3+
import (
4+
"time"
5+
6+
"github.com/sirupsen/logrus"
7+
)
8+
9+
const initialLogPeriod = time.Minute
10+
const maxLogPeriod = time.Hour
11+
12+
var cllog = logrus.WithField("component", "capacity.Limiter")
13+
14+
// CapacityLimiter forwards the flows between two nodes but checks the status of the destination
15+
// node's buffered channel. If it is already full, it drops the incoming flow and periodically will
16+
// log a message about the number of lost flows.
17+
type CapacityLimiter struct {
18+
droppedFlows int
19+
}
20+
21+
func (c *CapacityLimiter) Limit(in <-chan []*Record, out chan<- []*Record) {
22+
go c.logDroppedFlows()
23+
for i := range in {
24+
if len(out) < cap(out) {
25+
out <- i
26+
} else {
27+
c.droppedFlows += len(i)
28+
}
29+
}
30+
}
31+
32+
func (c *CapacityLimiter) logDroppedFlows() {
33+
logPeriod := initialLogPeriod
34+
debugging := logrus.IsLevelEnabled(logrus.DebugLevel)
35+
for {
36+
time.Sleep(logPeriod)
37+
38+
// a race condition might happen in this counter but it's not important as it's just for
39+
// logging purposes
40+
df := c.droppedFlows
41+
if df > 0 {
42+
c.droppedFlows = 0
43+
cllog.Warnf("%d flows were dropped during the last %s because the agent is forwarding "+
44+
"more flows than the remote ingestor is able to process. You might "+
45+
"want to increase the CACHE_MAX_FLOWS and CACHE_ACTIVE_TIMEOUT property",
46+
df, logPeriod)
47+
48+
// if not debug logs, backoff to avoid flooding the log with warning messages
49+
if !debugging && logPeriod < maxLogPeriod {
50+
logPeriod *= 2
51+
if logPeriod > maxLogPeriod {
52+
logPeriod = maxLogPeriod
53+
}
54+
}
55+
} else {
56+
logPeriod = initialLogPeriod
57+
}
58+
}
59+
}

pkg/flow/limiter_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package flow
2+
3+
import (
4+
"strconv"
5+
"testing"
6+
7+
"github.com/netobserv/gopipes/pkg/node"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
const limiterLen = 50
13+
14+
func TestCapacityLimiter_NoDrop(t *testing.T) {
15+
// GIVEN a limiter-enabled pipeline
16+
pipeIn, pipeOut := capacityLimiterPipe()
17+
18+
// WHEN it buffers less elements than it's maximum capacity
19+
for i := 0; i < 33; i++ {
20+
pipeIn <- []*Record{{Interface: strconv.Itoa(i)}}
21+
}
22+
23+
// THEN it is able to retrieve all the buffered elements
24+
for i := 0; i < 33; i++ {
25+
elem := <-pipeOut
26+
require.Len(t, elem, 1)
27+
assert.Equal(t, strconv.Itoa(i), elem[0].Interface)
28+
}
29+
30+
// AND not a single extra element
31+
select {
32+
case elem := <-pipeOut:
33+
assert.Failf(t, "unexpected element", "%#v", elem)
34+
default:
35+
// ok!
36+
}
37+
}
38+
39+
func TestCapacityLimiter_Drop(t *testing.T) {
40+
// GIVEN a limiter-enabled pipeline
41+
pipeIn, pipeOut := capacityLimiterPipe()
42+
43+
// WHEN it receives more elements than its maximum capacity
44+
// (it's not blocking)
45+
for i := 0; i < limiterLen*2; i++ {
46+
pipeIn <- []*Record{{Interface: strconv.Itoa(i)}}
47+
}
48+
49+
// THEN it is only able to retrieve all the nth first buffered elements
50+
// (plus the single element that is buffered in the output channel)
51+
for i := 0; i < limiterLen+1; i++ {
52+
elem := <-pipeOut
53+
require.Len(t, elem, 1)
54+
assert.Equal(t, strconv.Itoa(i), elem[0].Interface)
55+
}
56+
57+
// BUT not a single extra element
58+
select {
59+
case elem := <-pipeOut:
60+
assert.Failf(t, "unexpected element", "%#v", elem)
61+
default:
62+
// ok!
63+
}
64+
}
65+
66+
func capacityLimiterPipe() (in chan<- []*Record, out <-chan []*Record) {
67+
inCh, outCh := make(chan []*Record), make(chan []*Record)
68+
69+
init := node.AsInit(func(initOut chan<- []*Record) {
70+
for i := range inCh {
71+
initOut <- i
72+
}
73+
})
74+
limiter := node.AsMiddle((&CapacityLimiter{}).Limit)
75+
term := node.AsTerminal(func(termIn <-chan []*Record) {
76+
for i := range termIn {
77+
outCh <- i
78+
}
79+
}, node.ChannelBufferLen(limiterLen))
80+
81+
init.SendsTo(limiter)
82+
limiter.SendsTo(term)
83+
84+
init.Start()
85+
86+
return inCh, outCh
87+
}

0 commit comments

Comments
 (0)