Skip to content

Commit faf274e

Browse files
authored
Revert "change aggregation flow map to hashmap instead perCPU hashmap (#118)" (#172)
This reverts commit b6e2b87. fix Signed-off-by: msherif1234 <[email protected]>
1 parent a7d4eec commit faf274e

File tree

16 files changed

+172
-53
lines changed

16 files changed

+172
-53
lines changed

bpf/flows.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
7575
aggregate_flow->packets += 1;
7676
aggregate_flow->bytes += skb->len;
7777
aggregate_flow->end_mono_time_ts = pkt.current_ts;
78+
// it might happen that start_mono_time hasn't been set due to
79+
// the way percpu hashmap deal with concurrent map entries
80+
if (aggregate_flow->start_mono_time_ts == 0) {
81+
aggregate_flow->start_mono_time_ts = pkt.current_ts;
82+
}
7883
aggregate_flow->flags |= pkt.flags;
7984

8085
// Does not matter the gate. Will be zero if not enabled.

bpf/maps_definition.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ struct {
1111

1212
// Key: the flow identifier. Value: the flow metrics for that identifier.
1313
struct {
14-
__uint(type, BPF_MAP_TYPE_HASH);
14+
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
1515
__type(key, flow_id);
1616
__type(value, flow_metrics);
1717
__uint(max_entries, 1 << 24);

bpf/utils.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,12 +253,13 @@ static inline long pkt_drop_lookup_and_update_flow(struct sk_buff *skb, flow_id
253253
enum skb_drop_reason reason) {
254254
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id);
255255
if (aggregate_flow != NULL) {
256+
aggregate_flow->end_mono_time_ts = bpf_ktime_get_ns();
256257
aggregate_flow->pkt_drops.packets += 1;
257258
aggregate_flow->pkt_drops.bytes += skb->len;
258259
aggregate_flow->pkt_drops.latest_state = state;
259260
aggregate_flow->pkt_drops.latest_flags = flags;
260261
aggregate_flow->pkt_drops.latest_drop_cause = reason;
261-
long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_ANY);
262+
long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_EXIST);
262263
if (trace_messages && ret != 0) {
263264
bpf_printk("error packet drop updating flow %d\n", ret);
264265
}

docs/architecture.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ flowchart TD
1111
E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer)
1212
style E fill:#990
1313
14-
E --> |"polls<br/>HashMap"| M(flow.MapTracer)
14+
E --> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer)
1515
RB --> |chan *flow.Record| ACC(flow.Accounter)
1616
RB -.-> |flushes| M
1717
ACC --> |"chan []*flow.Record"| DD(flow.Deduper)

examples/flowlogs-dump/server/flowlogs-dump-collector.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func main() {
7272
for records := range receivedRecords {
7373
for _, record := range records.Entries {
7474
if record.EthProtocol == ipv6 {
75-
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n",
75+
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
7676
ipProto[record.EthProtocol],
7777
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
7878
record.Interface,
@@ -92,9 +92,12 @@ func main() {
9292
record.GetDnsFlags(),
9393
record.DnsLatency.AsDuration().Milliseconds(),
9494
record.TimeFlowRtt.AsDuration().Nanoseconds(),
95+
record.GetPktDropPackets(),
96+
record.GetPktDropBytes(),
97+
record.GetPktDropLatestDropCause(),
9598
)
9699
} else {
97-
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v\n",
100+
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
98101
ipProto[record.EthProtocol],
99102
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
100103
record.Interface,
@@ -114,6 +117,9 @@ func main() {
114117
record.GetDnsFlags(),
115118
record.DnsLatency.AsDuration().Milliseconds(),
116119
record.TimeFlowRtt.AsDuration().Nanoseconds(),
120+
record.GetPktDropPackets(),
121+
record.GetPktDropBytes(),
122+
record.GetPktDropLatestDropCause(),
117123
)
118124
}
119125
}

pkg/agent/agent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ type ebpfFlowFetcher interface {
7878
io.Closer
7979
Register(iface ifaces.Interface) error
8080

81-
LookupAndDeleteMap() map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics
81+
LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
8282
DeleteMapsStaleEntries(timeOut time.Duration)
8383
ReadRingBuf() (ringbuf.Record, error)
8484
}

pkg/agent/agent_test.go

Lines changed: 59 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ var (
4949
DstPort: 456,
5050
IfIndex: 3,
5151
}
52+
key1Dupe = ebpf.BpfFlowId{
53+
SrcPort: 123,
54+
DstPort: 456,
55+
IfIndex: 4,
56+
}
5257

5358
key2 = ebpf.BpfFlowId{
5459
SrcPort: 333,
@@ -66,7 +71,7 @@ func TestFlowsAgent_Deduplication(t *testing.T) {
6671
})
6772

6873
exported := export.Get(t, timeout)
69-
assert.Len(t, exported, 1)
74+
assert.Len(t, exported, 2)
7075

7176
receivedKeys := map[ebpf.BpfFlowId]struct{}{}
7277

@@ -76,11 +81,21 @@ func TestFlowsAgent_Deduplication(t *testing.T) {
7681
receivedKeys[f.Id] = struct{}{}
7782
switch f.Id {
7883
case key1:
79-
assert.EqualValues(t, 3, f.Metrics.Packets)
80-
assert.EqualValues(t, 44, f.Metrics.Bytes)
84+
assert.EqualValues(t, 4, f.Metrics.Packets)
85+
assert.EqualValues(t, 66, f.Metrics.Bytes)
8186
assert.False(t, f.Duplicate)
8287
assert.Equal(t, "foo", f.Interface)
8388
key1Flows = append(key1Flows, f)
89+
case key1Dupe:
90+
assert.EqualValues(t, 4, f.Metrics.Packets)
91+
assert.EqualValues(t, 66, f.Metrics.Bytes)
92+
assert.False(t, f.Duplicate)
93+
assert.Equal(t, "bar", f.Interface)
94+
key1Flows = append(key1Flows, f)
95+
case key2:
96+
assert.EqualValues(t, 7, f.Metrics.Packets)
97+
assert.EqualValues(t, 33, f.Metrics.Bytes)
98+
assert.False(t, f.Duplicate)
8499
}
85100
}
86101
assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows)
@@ -97,22 +112,33 @@ func TestFlowsAgent_DeduplicationJustMark(t *testing.T) {
97112
exported := export.Get(t, timeout)
98113
receivedKeys := map[ebpf.BpfFlowId]struct{}{}
99114

100-
assert.Len(t, exported, 1)
115+
assert.Len(t, exported, 3)
101116
duplicates := 0
102117
for _, f := range exported {
103118
require.NotContains(t, receivedKeys, f.Id)
104119
receivedKeys[f.Id] = struct{}{}
105120
switch f.Id {
106121
case key1:
107-
assert.EqualValues(t, 3, f.Metrics.Packets)
108-
assert.EqualValues(t, 44, f.Metrics.Bytes)
122+
assert.EqualValues(t, 4, f.Metrics.Packets)
123+
assert.EqualValues(t, 66, f.Metrics.Bytes)
109124
if f.Duplicate {
110125
duplicates++
111126
}
112127
assert.Equal(t, "foo", f.Interface)
128+
case key1Dupe:
129+
assert.EqualValues(t, 4, f.Metrics.Packets)
130+
assert.EqualValues(t, 66, f.Metrics.Bytes)
131+
if f.Duplicate {
132+
duplicates++
133+
}
134+
assert.Equal(t, "bar", f.Interface)
135+
case key2:
136+
assert.EqualValues(t, 7, f.Metrics.Packets)
137+
assert.EqualValues(t, 33, f.Metrics.Bytes)
138+
assert.False(t, f.Duplicate)
113139
}
114140
}
115-
assert.Equalf(t, 0, duplicates, "exported flows should have only one duplicate: %#v", exported)
141+
assert.Equalf(t, 1, duplicates, "exported flows should have only one duplicate: %#v", exported)
116142
}
117143

118144
func TestFlowsAgent_Deduplication_None(t *testing.T) {
@@ -123,7 +149,7 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
123149
})
124150

125151
exported := export.Get(t, timeout)
126-
assert.Len(t, exported, 1)
152+
assert.Len(t, exported, 3)
127153
receivedKeys := map[ebpf.BpfFlowId]struct{}{}
128154

129155
var key1Flows []*flow.Record
@@ -132,14 +158,24 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
132158
receivedKeys[f.Id] = struct{}{}
133159
switch f.Id {
134160
case key1:
135-
assert.EqualValues(t, 3, f.Metrics.Packets)
136-
assert.EqualValues(t, 44, f.Metrics.Bytes)
161+
assert.EqualValues(t, 4, f.Metrics.Packets)
162+
assert.EqualValues(t, 66, f.Metrics.Bytes)
137163
assert.False(t, f.Duplicate)
138164
assert.Equal(t, "foo", f.Interface)
139165
key1Flows = append(key1Flows, f)
166+
case key1Dupe:
167+
assert.EqualValues(t, 4, f.Metrics.Packets)
168+
assert.EqualValues(t, 66, f.Metrics.Bytes)
169+
assert.False(t, f.Duplicate)
170+
assert.Equal(t, "bar", f.Interface)
171+
key1Flows = append(key1Flows, f)
172+
case key2:
173+
assert.EqualValues(t, 7, f.Metrics.Packets)
174+
assert.EqualValues(t, 33, f.Metrics.Bytes)
175+
assert.False(t, f.Duplicate)
140176
}
141177
}
142-
assert.Lenf(t, key1Flows, 1, "both key1 flows should have been forwarded: %#v", key1Flows)
178+
assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows)
143179
}
144180

145181
func TestFlowsAgent_Decoration(t *testing.T) {
@@ -149,7 +185,7 @@ func TestFlowsAgent_Decoration(t *testing.T) {
149185
})
150186

151187
exported := export.Get(t, timeout)
152-
assert.Len(t, exported, 1)
188+
assert.Len(t, exported, 3)
153189

154190
// Tests that the decoration stage has been properly executed. It should
155191
// add the interface name and the agent IP
@@ -183,10 +219,17 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
183219
})
184220

185221
now := uint64(monotime.Now())
186-
key1Metrics := ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000}
187-
188-
ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{
189-
key1: &key1Metrics,
222+
key1Metrics := []ebpf.BpfFlowMetrics{
223+
{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000},
224+
{Packets: 1, Bytes: 22, StartMonoTimeTs: now, EndMonoTimeTs: now + 3000},
225+
}
226+
key2Metrics := []ebpf.BpfFlowMetrics{
227+
{Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000},
228+
}
229+
ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{
230+
key1: key1Metrics,
231+
key1Dupe: key1Metrics,
232+
key2: key2Metrics,
190233
})
191234
return export
192235
}

pkg/ebpf/bpf_bpfeb.o

904 Bytes
Binary file not shown.

pkg/ebpf/bpf_bpfel.o

936 Bytes
Binary file not shown.

pkg/ebpf/tracer.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -377,27 +377,28 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
377377
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
378378
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
379379
// Race conditions here causes that some flows are lost in high-load scenarios
380-
func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]*BpfFlowMetrics {
380+
func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics {
381381
flowMap := m.objects.AggregatedFlows
382382

383383
iterator := flowMap.Iterate()
384-
var flow = make(map[BpfFlowId]*BpfFlowMetrics, m.cacheMaxSize)
384+
var flows = make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
385385
var id BpfFlowId
386-
var metric BpfFlowMetrics
386+
var metrics []BpfFlowMetrics
387387

388388
// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
389389
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
390-
for iterator.Next(&id, &metric) {
390+
for iterator.Next(&id, &metrics) {
391391
if err := flowMap.Delete(id); err != nil {
392392
log.WithError(err).WithField("flowId", id).
393393
Warnf("couldn't delete flow entry")
394394
}
395-
metricPtr := new(BpfFlowMetrics)
396-
*metricPtr = metric
397-
flow[id] = metricPtr
395+
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
396+
// (probably due to race conditions) so we need to re-join metrics again at userspace
397+
// TODO: instrument how many times the keys are is repeated in the same eviction
398+
flows[id] = append(flows[id], metrics...)
398399
}
399400

400-
return flow
401+
return flows
401402
}
402403

403404
// DeleteMapsStaleEntries Look for any stale entries in the features maps and delete them

0 commit comments

Comments
 (0)