Skip to content

Commit 516a29c

Browse files
author
Mario Macias
authored
NETOBSERV-613: decrease premature eviction of eBPF hashmap (#61)
* NETOBSERV-613: decrease premature eviction of eBPF hashmap * fix tests * Fix e2e tests for out-of-order reporting * rm unneeded comment * extend eventually time just in case
1 parent 17f402c commit 516a29c

File tree

9 files changed

+101
-28
lines changed

9 files changed

+101
-28
lines changed

bpf/flow.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ typedef struct flow_metrics_t {
1717
// as output from bpf_ktime_get_ns()
1818
u64 start_mono_time_ts;
1919
u64 end_mono_time_ts;
20+
// The positive errno of a failed map insertion that caused a flow
21+
// to be sent via ringbuffer.
22+
// 0 otherwise
23+
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
24+
u8 errno;
2025
} __attribute__((packed)) flow_metrics;
2126

2227
// Attributes that uniquely identify a flow

bpf/flows.c

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ struct {
5858

5959
// Constant definitions, to be overridden by the invoker
6060
volatile const u32 sampling = 0;
61+
volatile const u8 trace_messages = 0;
6162

6263
const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};
6364

@@ -184,7 +185,15 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
184185
aggregate_flow->start_mono_time_ts = current_time;
185186
}
186187

187-
bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_EXIST);
188+
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
189+
if (trace_messages && ret != 0) {
190+
// usually error -16 (-EBUSY) is printed here.
191+
// In this case, the flow is dropped, as submitting it to the ringbuffer would cause
192+
// a duplicated UNION of flows (two different flows with partial aggregation of the same packets),
193+
// which can't be deduplicated.
194+
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
195+
bpf_printk("error updating flow %d\n", ret);
196+
}
188197
} else {
189198
// Key does not exist in the map, and will need to create a new entry.
190199
flow_metrics new_flow = {
@@ -196,13 +205,23 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
196205

197206
// even if we know that the entry is new, another CPU might be concurrently inserting a flow
198207
// so we need to specify BPF_ANY
199-
if (bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY) != 0) {
200-
/*
201-
When the map is full, we directly send the flow entry to userspace via ringbuffer,
202-
until space is available in the kernel-side maps
203-
*/
208+
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
209+
if (ret != 0) {
210+
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here.
211+
// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have
212+
// a repeated INTERSECTION of flows (different flows aggregating different packets),
213+
// which can be re-aggregated at userpace.
214+
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
215+
if (trace_messages) {
216+
bpf_printk("error adding flow %d\n", ret);
217+
}
218+
219+
new_flow.errno = -ret;
204220
flow_record *record = bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
205221
if (!record) {
222+
if (trace_messages) {
223+
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow");
224+
}
206225
return TC_ACT_OK;
207226
}
208227
record->id = id;

e2e/basic/flow_test.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package basic
44

55
import (
66
"context"
7+
"fmt"
78
"os"
89
"path"
910
"strconv"
@@ -99,17 +100,15 @@ func TestSinglePacketFlows(t *testing.T) {
99100
require.NoError(t, err)
100101
logrus.WithFields(logrus.Fields{"stdOut": stdOut, "stdErr": stdErr}).Debug("ping sent")
101102

102-
sent, recv := getPingFlows(t, latestFlowMS)
103+
sent, recv := getPingFlows(t, latestFlowMS, pktLen+ipIcmpHeadersLen)
103104
logrus.Debugf("ping request flow: %#v", sent)
104105
logrus.Debugf("ping response flow: %#v", recv)
105106

106107
assert.Equal(t, pingerIP, sent["SrcAddr"])
107108
assert.Equal(t, serverPodIP, sent["DstAddr"])
108-
assert.EqualValues(t, pktLen+ipIcmpHeadersLen, sent["Bytes"])
109109
assert.EqualValues(t, 1, sent["Packets"])
110110
assert.Equal(t, pingerIP, recv["DstAddr"])
111111
assert.Equal(t, serverPodIP, recv["SrcAddr"])
112-
assert.EqualValues(t, pktLen+ipIcmpHeadersLen, recv["Bytes"])
113112
assert.EqualValues(t, 1, recv["Packets"])
114113

115114
if t.Failed() {
@@ -126,15 +125,22 @@ func TestSinglePacketFlows(t *testing.T) {
126125
).Feature())
127126
}
128127

129-
func getPingFlows(t *testing.T, newerThan time.Time) (sent, recv map[string]interface{}) {
128+
func getPingFlows(t *testing.T, newerThan time.Time, expectedBytes int) (sent, recv map[string]interface{}) {
130129
logrus.Debug("Verifying that the request/return ICMP packets have been captured individually")
131130
var query *tester.LokiQueryResponse
132131
var err error
133-
test.Eventually(t, testTimeout, func(t require.TestingT) {
132+
133+
test.Eventually(t, time.Minute, func(t require.TestingT) {
134134
query, err = testCluster.Loki().
135-
Query(1, `{SrcK8S_OwnerName="pinger",DstK8S_OwnerName="server"}|="\"Proto\":1,"`) // Proto 1 == ICMP
135+
Query(1, fmt.Sprintf(
136+
`{SrcK8S_OwnerName="pinger",DstK8S_OwnerName="server"}`+
137+
`|~"\"Proto\":1[,}]"`+ // Proto 1 == ICMP
138+
`|~"\"Bytes\":%d[,}]"`, expectedBytes))
136139
require.NoError(t, err)
137140
require.NotNil(t, query)
141+
if query == nil {
142+
return
143+
}
138144
require.NotEmpty(t, query.Data.Result)
139145
if len(query.Data.Result) > 0 {
140146
sent, err = query.Data.Result[0].Values[0].FlowData()
@@ -144,11 +150,16 @@ func getPingFlows(t *testing.T, newerThan time.Time) (sent, recv map[string]inte
144150
}
145151
}, test.Interval(time.Second))
146152

147-
test.Eventually(t, testTimeout, func(t require.TestingT) {
153+
test.Eventually(t, time.Minute, func(t require.TestingT) {
148154
query, err = testCluster.Loki().
149-
Query(1, `{DstK8S_OwnerName="pinger",SrcK8S_OwnerName="server"}|="\"Proto\":1,"`) // Proto 1 == ICMP
155+
Query(1, fmt.Sprintf(`{SrcK8S_OwnerName="server",DstK8S_OwnerName="pinger"}`+
156+
`|~"\"Proto\":1[,}]"`+ // Proto 1 == ICMP
157+
`|~"\"Bytes\":%d[,}]"`, expectedBytes))
150158
require.NoError(t, err)
151159
require.NotNil(t, query)
160+
if query == nil {
161+
return
162+
}
152163
require.Len(t, query.Data.Result, 1)
153164
if len(query.Data.Result) > 0 {
154165
recv, err = query.Data.Result[0].Values[0].FlowData()

pkg/agent/agent.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,13 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
132132

133133
ingress, egress := flowDirections(cfg)
134134

135+
debug := false
136+
if cfg.LogLevel == logrus.TraceLevel.String() || cfg.LogLevel == logrus.DebugLevel.String() {
137+
debug = true
138+
}
139+
135140
tracer, err := ebpf.NewFlowTracer(
141+
debug,
136142
cfg.Sampling, cfg.CacheMaxFlows, cfg.BuffersLength, cfg.CacheActiveTimeout,
137143
ingress, egress,
138144
interfaceNamer,

pkg/ebpf/bpf_bpfeb.o

1.81 KB
Binary file not shown.

pkg/ebpf/bpf_bpfel.o

1.81 KB
Binary file not shown.

pkg/ebpf/tracer.go

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strings"
1010
"sync"
1111
"sync/atomic"
12+
"syscall"
1213
"time"
1314

1415
"github.com/cilium/ebpf/ringbuf"
@@ -28,6 +29,7 @@ const (
2829
qdiscType = "clsact"
2930
// constants defined in flows.c as "volatile const"
3031
constSampling = "sampling"
32+
constTraceMessages = "trace_messages"
3133
aggregatedFlowsMap = "aggregated_flows"
3234
)
3335

@@ -50,10 +52,17 @@ type FlowTracer struct {
5052
cacheMaxSize int
5153
enableIngress bool
5254
enableEgress bool
55+
// ringBuf supports atomic logging of ringBuffer metrics
56+
ringBuf struct {
57+
isForwarding int32
58+
forwardedFlows int32
59+
mapFullErrs int32
60+
}
5361
}
5462

5563
// TODO: decouple flowtracer logic from eBPF maps access so we can inject mocks for testing
5664
func NewFlowTracer(
65+
traceMessages bool,
5766
sampling, cacheMaxSize, buffersLength int,
5867
evictionTimeout time.Duration,
5968
ingress, egress bool,
@@ -73,8 +82,13 @@ func NewFlowTracer(
7382
// Resize aggregated flows map according to user-provided configuration
7483
spec.Maps[aggregatedFlowsMap].MaxEntries = uint32(cacheMaxSize)
7584

85+
traceMsgs := 0
86+
if traceMessages {
87+
traceMsgs = 1
88+
}
7689
if err := spec.RewriteConstants(map[string]interface{}{
77-
constSampling: uint32(sampling),
90+
constSampling: uint32(sampling),
91+
constTraceMessages: uint8(traceMsgs),
7892
}); err != nil {
7993
return nil, fmt.Errorf("rewriting BPF constants definition: %w", err)
8094
}
@@ -376,8 +390,7 @@ func (m *FlowTracer) Trace(ctx context.Context, forwardFlows chan<- []*flow.Reco
376390
func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlows chan<- []*flow.Record) {
377391
flowAccount := make(chan *flow.RawRecord, m.buffersLength)
378392
go m.accounter.Account(flowAccount, forwardFlows)
379-
isForwarding := int32(0)
380-
forwardedFlows := int32(0)
393+
debugging := logrus.IsLevelEnabled(logrus.DebugLevel)
381394
for {
382395
select {
383396
case <-ctx.Done():
@@ -399,11 +412,15 @@ func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlow
399412
log.WithError(err).Warn("reading ringbuf event")
400413
continue
401414
}
402-
if logrus.IsLevelEnabled(logrus.DebugLevel) {
403-
m.logRingBufferFlows(&forwardedFlows, &isForwarding)
415+
mapFullError := readFlow.Errno == uint8(syscall.E2BIG)
416+
if debugging {
417+
m.logRingBufferFlows(mapFullError)
404418
}
419+
// if the flow was received due to lack of space in the eBPF map
405420
// forces a flow's eviction to leave room for new flows in the ebpf cache
406-
m.flowsEvictor.Broadcast()
421+
if mapFullError {
422+
m.flowsEvictor.Broadcast()
423+
}
407424

408425
// Will need to send it to accounter anyway to account regardless of complete/ongoing flow
409426
flowAccount <- readFlow
@@ -413,17 +430,28 @@ func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlow
413430

414431
// logRingBufferFlows avoids flooding logs on long series of evicted flows by grouping how
415432
// many flows are forwarded
416-
func (m *FlowTracer) logRingBufferFlows(forwardedFlows, isForwarding *int32) {
417-
atomic.AddInt32(forwardedFlows, 1)
418-
if atomic.CompareAndSwapInt32(isForwarding, 0, 1) {
433+
func (m *FlowTracer) logRingBufferFlows(mapFullErr bool) {
434+
atomic.AddInt32(&m.ringBuf.forwardedFlows, 1)
435+
if mapFullErr {
436+
atomic.AddInt32(&m.ringBuf.mapFullErrs, 1)
437+
}
438+
if atomic.CompareAndSwapInt32(&m.ringBuf.isForwarding, 0, 1) {
419439
go func() {
420440
time.Sleep(m.evictionTimeout)
421-
log.WithFields(logrus.Fields{
422-
"flows": atomic.LoadInt32(forwardedFlows),
441+
mfe := atomic.LoadInt32(&m.ringBuf.mapFullErrs)
442+
l := log.WithFields(logrus.Fields{
443+
"flows": atomic.LoadInt32(&m.ringBuf.forwardedFlows),
444+
"mapFullErrs": mfe,
423445
"cacheMaxFlows": m.cacheMaxSize,
424-
}).Debug("received flows via ringbuffer. You might want to increase the CACHE_MAX_FLOWS value")
425-
atomic.StoreInt32(forwardedFlows, 0)
426-
atomic.StoreInt32(isForwarding, 0)
446+
})
447+
if mfe == 0 {
448+
l.Debug("received flows via ringbuffer")
449+
} else {
450+
l.Debug("received flows via ringbuffer. You might want to increase the CACHE_MAX_FLOWS value")
451+
}
452+
atomic.StoreInt32(&m.ringBuf.forwardedFlows, 0)
453+
atomic.StoreInt32(&m.ringBuf.isForwarding, 0)
454+
atomic.StoreInt32(&m.ringBuf.mapFullErrs, 0)
427455
}()
428456
}
429457
}

pkg/flow/record.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ type RecordMetrics struct {
6565
// and monotime.Now() (user space)
6666
StartMonoTimeNs uint64
6767
EndMonoTimeNs uint64
68+
69+
Errno uint8
6870
}
6971

7072
// record structure as parsed from eBPF

pkg/flow/record_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func TestRecordBinaryEncoding(t *testing.T) {
2626
0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 bytes
2727
0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_start_time
2828
0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_end_time
29+
0x33, // u8 errno
2930
}))
3031
require.NoError(t, err)
3132

@@ -53,6 +54,7 @@ func TestRecordBinaryEncoding(t *testing.T) {
5354
Bytes: 0x1a19181716151413,
5455
StartMonoTimeNs: 0x1a19181716151413,
5556
EndMonoTimeNs: 0x1a19181716151413,
57+
Errno: 0x33,
5658
},
5759
}, *fr)
5860
// assert that IP addresses are interpreted as IPv4 addresses

0 commit comments

Comments
 (0)