Skip to content

Commit b3c3732

Browse files
author
Mario Macias
authored
NETOBSERV-560: fix fragmentation of reported flows (#51)
1 parent 67ad201 commit b3c3732

File tree

5 files changed

+18
-12
lines changed

5 files changed

+18
-12
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ image-push: ## Push OCI image with the manager.
126126
.PHONY: tests-e2e
127127
.ONESHELL:
128128
tests-e2e: prereqs
129+
go clean -testcache
129130
# making the local agent image available to kind in two ways, so it will work in different
130131
# environments: (1) as image tagged in the local repository (2) as image archive.
131132
$(OCI_BIN) build . -t localhost/ebpf-agent:test

bpf/flows.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
171171
id.if_index = skb->ifindex;
172172
id.direction = direction;
173173

174+
// TODO: we need to add spinlock here when we deprecate versions prior to 5.1, or provide
175+
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
174176
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, &id);
175177
if (aggregate_flow != NULL) {
176178
aggregate_flow->packets += 1;
@@ -186,8 +188,10 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
186188
.start_mono_time_ts = current_time,
187189
.end_mono_time_ts = current_time,
188190
};
189-
190-
if (bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST) != 0) {
191+
192+
// even if we know that the entry is new, another CPU might be concurrently inserting a flow
193+
// so we need to specify BPF_ANY
194+
if (bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY) != 0) {
191195
/*
192196
When the map is full, we directly send the flow entry to userspace via ringbuffer,
193197
until space is available in the kernel-side maps

pkg/ebpf/bpf_bpfeb.o

0 Bytes
Binary file not shown.

pkg/ebpf/bpf_bpfel.o

0 Bytes
Binary file not shown.

pkg/ebpf/tracer.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type FlowTracer struct {
4646
// manages the access to the eviction routines, avoiding two evictions happening at the same time
4747
flowsEvictor *sync.Cond
4848
lastEvictionNs uint64
49+
cacheMaxSize int
4950
}
5051

5152
// TODO: decouple flowtracer logic from eBPF maps access so we can inject mocks for testing
@@ -94,6 +95,7 @@ func NewFlowTracer(
9495
interfaceNamer: namer,
9596
flowsEvictor: sync.NewCond(&sync.Mutex{}),
9697
lastEvictionNs: uint64(monotime.Now()),
98+
cacheMaxSize: cacheMaxSize,
9799
}, nil
98100
}
99101

@@ -300,12 +302,9 @@ func (m *FlowTracer) evictFlows(tlog *logrus.Entry, forwardFlows chan<- []*flow.
300302
monotonicTimeNow := monotime.Now()
301303
currentTime := time.Now()
302304

303-
mapKeys, mapValues := m.lookupAndDeleteMapKeysValues()
304-
305305
var forwardingFlows []*flow.Record
306306
laterFlowNs := uint64(0)
307-
for nf, flowKey := range mapKeys {
308-
flowMetrics := mapValues[nf]
307+
for flowKey, flowMetrics := range m.lookupAndDeleteFlowsMap() {
309308
aggregatedMetrics := m.aggregate(flowMetrics)
310309
// we ignore metrics that haven't been aggregated (e.g. all the mapped values are ignored)
311310
if aggregatedMetrics.EndMonoTimeNs == 0 {
@@ -380,12 +379,12 @@ func (m *FlowTracer) listenAndForwardRingBuffer(ctx context.Context, forwardFlow
380379
// Changing this method invaction by BatchLookupAndDelete could improve performance
381380
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
382381
// Supported Lookup/Delete oprations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
383-
func (m *FlowTracer) lookupAndDeleteMapKeysValues() ([]flow.RecordKey, [][]flow.RecordMetrics) {
382+
func (m *FlowTracer) lookupAndDeleteFlowsMap() map[flow.RecordKey][]flow.RecordMetrics {
384383
flowMap := m.objects.AggregatedFlows
385-
var flowKeys []flow.RecordKey
386-
var flowsValues [][]flow.RecordMetrics
387384

388385
iterator := flowMap.Iterate()
386+
flows := make(map[flow.RecordKey][]flow.RecordMetrics, m.cacheMaxSize)
387+
389388
id := flow.RecordKey{}
390389
var metrics []flow.RecordMetrics
391390
// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
@@ -395,8 +394,10 @@ func (m *FlowTracer) lookupAndDeleteMapKeysValues() ([]flow.RecordKey, [][]flow.
395394
log.WithError(err).WithField("flowId", id).
396395
Warnf("couldn't delete flow entry")
397396
}
398-
flowKeys = append(flowKeys, id)
399-
flowsValues = append(flowsValues, metrics)
397+
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
398+
// (probably due to race conditions) so we need to re-join metrics again at userspace
399+
// TODO: instrument how many times the keys are is repeated in the same eviction
400+
flows[id] = append(flows[id], metrics...)
400401
}
401-
return flowKeys, flowsValues
402+
return flows
402403
}

0 commit comments

Comments
 (0)