Skip to content

Commit c27bd51

Browse files
praveingkMario Macias
andauthored
Performance optimization using eBPF aggregation (#39)
* Add ebpf flows with per-cpu hashmap * Add flags to perform eviction * Add protocol to the evicted entry * Add new headers to record * tracer looksup the map on evicted entry * Handle tcp reset flag * Handle rst flag * Enable timeout based eviction of ingress/egress maps * ongoing performance measurements * Add in logic in README * Formatting changes to readme * Format changes * Correct the byte count * Minor edits for tracer * ebpf code with right byte count * Update README.md * Update README.md * Latest measurements with multiflow and cpu,mem * Add chart for throughput * Cosmetic changes to measurements * v6 support * Logic for time calculation * Cleanup of ebpf code * Remove prints and cleanup * Minor comments * Remove unused lines * bpf binary minor edit * Alignment corrections * Remove extra comments Co-authored-by: Mario Macias <[email protected]> * Refactor my_flow_id to id * bug: Add direction variable while export * Remove printf * Handle hash collisions and improper deletions * Remove stray debug entry * Correct lint errors * Add monotime module for CLOCK_MONOTONIC * Use monotime instead of cgo * Add monotime to go.sum * Add monotime package to vendor * tidy imports * EvictionTimeout as duration * fixed testmain * Fixed tests * fixing getPingFlows verification * modify comment * fix e2e test version * Fix flow timing issue with reference time * Tidy to fix lint errors * fix errors in merge * Remove TCP flag based eviction * Remove TCP FIN/RST handling * Remove redundant fields * Simplify eBPF code optimizations (#49) * simplifying ebpf agent * version not really working well * almost-working tests but I suspect that monotonic time could be doing bad stuff there * not-yet-100% working tests * reusing same bpfObjects for all the interfaces. That should decrease memory usage * define max_entries at userspace * avoid if/elses in C code map operations * fixed compilation of unit tests * wip: re-enable ring-buffer flows * moved accounter inside tracer * one single tracer for all the qdiscs and filters * evict flows on ringbuffer * Minor changes * properly tested (and fixed) userspace accounter * move eBPF system setup to flowtracer creation * Discard old flows from being aggregated * Fix zero-valued aggregated flows * fix timestamp checking for flow discarding * Unify ingress/egress maps * Fixed build and test * Updated generated eBPF binaries * fix bug that caused that first flow could have start time == 0 Co-authored-by: Pravein <Pravein Govindan Kannan> Co-authored-by: Mario Macias <[email protected]>
1 parent 036919c commit c27bd51

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1348
-661
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ generate: prereqs
8181
.PHONY: docker-generate
8282
docker-generate:
8383
@echo "### Creating the container that generates the eBPF binaries"
84-
docker build . -f scripts/generators.Dockerfile -t $(LOCAL_GENERATOR_IMAGE)
85-
docker run --rm -v $(shell pwd):/src $(LOCAL_GENERATOR_IMAGE)
84+
$(OCI_BIN) build . -f scripts/generators.Dockerfile -t $(LOCAL_GENERATOR_IMAGE)
85+
$(OCI_BIN) run --rm -v $(shell pwd):/src $(LOCAL_GENERATOR_IMAGE)
8686

8787
.PHONY: build
8888
build: prereqs fmt lint test vendors compile

bpf/README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
## Flows v2: An improved version of Netobserv eBPF Agent
2+
3+
### What Changed?
4+
At the eBPF/TC code, the v1 used a ringbuffer to export flow records to the userspace program.
5+
Based on our measurements, ringbuffer can lead to a bottleneck since each a record for each packet in the data-path needs to be sent to the userspace, which eventually results in loss of records.
6+
Additionally, this leads to high CPU utilization since the userspace program would be constantly active to handle callback events on a per-packet basis.
7+
Refer to the [Measurements slide-deck](../docs/measurements.pptx) for performance measurements.
8+
To tackle this and achieve 100% monitoring coverage, the v2 eBPF/TC code uses a Per-CPU Hash Map to aggregate flow-based records in the eBPF data-path, and pro-actively send the records to userspace upon flow termination. The detailed logic is below:
9+
10+
#### eBPF Data-path Logic:
11+
1) Store flow information in a per-cpu hash map. A separate per-cpu hash map is maintained for ingress and egress to avoid performance bottlenecks.
12+
One design choice that needs to be concretized with performance measurements is to whether v4 and v6 IPs need to be maintained in the same map or a different one.
13+
On a higher level note, need to check if increasing the map size (hash computation part) affect throughput.
14+
2) Upon Packet Arrival, a lookup is performed on the map.
15+
* If the lookup is successful, then update the packet count, byte count, and the current timestamp.
16+
* If the lookup is unsuccessful, then try creating a new entry in the map.
17+
18+
3) If entry creation failed due to a full map, then send the entry to userspace program via ringbuffer.
19+
4) Upon flow completion (tcp->fin/rst event), send the flow-id to userspace via ringbuffer.
20+
21+
##### Hash collisions
22+
One downside of using hash-based map is, When flows are hashed to the per-cpu map, there is a possibility of hash collisions occuring which would make multiple different flows map into the same entry. As a result, it might lead to inaccurate flow entries. To handle hash collisions we do the following :
23+
1) In each flow entry, we additionally maintain the full key/id.
24+
2) Before a packet's id is updated to map, the key is additionally compared to check if there is another flow residing in the map.
25+
3) If there is another flow, we do want to update the entry wrongly. Hence, we send the new packet entry directly to userspace via ringbuffer after updating a flag to inform of collision.
26+
27+
To detect and handle
28+
#### User-space program Logic: (Refer [tracer.go](../pkg/ebpf/tracer.go))
29+
The userspace program has three active threads:
30+
31+
1) **Trace** :
32+
a) If the received flow-id is a flow completion (indicated via the flags) from eBPF data-path via ringbuffer and does the following:
33+
* ScrubFlow : Performs lookup of the flow-id in the ingress/egress map and aggregates the metrics from different CPU specific counters. Then deletes the entry corresponding to the flow-id from the map.
34+
* Exports the aggregated flow record to the accounter pipeline.
35+
b) If the received flow-id is not a flow completion event, then just forward this record to accounter pipeline. It will be aggregated in future by accounter upon flow completion.
36+
37+
2) **MonitorIngress** :
38+
This is a periodic thread which wakes up every n seconds and does the following :
39+
a) Create a map iterator, and iterates over each entry in the map.
40+
b) Evict an entry if the condition is met :
41+
* If the timestamp of the last seen packet in the flow is more than m seconds ago.
42+
* There are other options for eviction that can be implemented, either based on the packets/bytes observed. Or a more aggressive eviction if the map is k% full. These are further improvements that can be performed to fine-tune the map usage based on the scenario and use-case.
43+
44+
c) The evicted entry is aggregated into a flow-record and forwarded to the accounter pipeline.
45+
46+
3) **MonitorEgress** :
47+
This is a period thread, which does the same task as MonitorIngress, but only the map is egress.
48+
49+
##### Hash Collision handling in user-space
50+
Inspite of handling hash collisions in the eBPF datapath, there is still a chance of multiple flows mapping to the same map, since per-cpu map maintains a separate entries per-cpu. Hence, its possible that multiple flows from different CPUs can map into the same entry, but are in different buckets. Hence, during aggregation of entries, we check the key before aggregating the entries per-flow. Upon detection of such entries, we export the entry to accounter. Now since the flow key is stored along with each entry, we can recover such collided entries and send to accounter.

bpf/flow.h

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,37 +10,40 @@ typedef __u16 u16;
1010
typedef __u32 u32;
1111
typedef __u64 u64;
1212

13-
// L2 data link layer
14-
struct data_link {
13+
typedef struct flow_metrics_t {
14+
u32 packets;
15+
u64 bytes;
16+
// Flow start and end times as monotomic timestamps in nanoseconds
17+
// as output from bpf_ktime_get_ns()
18+
u64 start_mono_time_ts;
19+
u64 end_mono_time_ts;
20+
} __attribute__((packed)) flow_metrics;
21+
22+
// Attributes that uniquely identify a flow
23+
typedef struct flow_id_t {
24+
u16 eth_protocol;
25+
u8 direction;
26+
// L2 data link layer
1527
u8 src_mac[ETH_ALEN];
1628
u8 dst_mac[ETH_ALEN];
17-
} __attribute__((packed));
18-
19-
// L3 network layer
20-
// IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96
21-
// as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2
22-
struct network {
29+
// L3 network layer
30+
// IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96
31+
// as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2
2332
struct in6_addr src_ip;
2433
struct in6_addr dst_ip;
25-
} __attribute__((packed));
26-
27-
// L4 transport layer
28-
struct transport {
34+
// L4 transport layer
2935
u16 src_port;
3036
u16 dst_port;
31-
u8 protocol;
32-
} __attribute__((packed));
33-
34-
// TODO: L5 session layer to bound flows to connections?
35-
36-
// contents in this struct must match byte-by-byte with Go's pkc/flow/Record struct
37-
struct flow {
38-
u16 protocol;
39-
u8 direction;
40-
struct data_link data_link;
41-
struct network network;
42-
struct transport transport;
43-
u64 bytes;
44-
} __attribute__((packed));
45-
37+
u8 transport_protocol;
38+
// OS interface index
39+
u32 if_index;
40+
} __attribute__((packed)) flow_id;
41+
42+
// Flow record is a tuple containing both flow identifier and metrics. It is used to send
43+
// a complete flow via ring buffer when only when the accounting hashmap is full.
44+
// Contents in this struct must match byte-by-byte with Go's pkc/flow/Record struct
45+
typedef struct flow_record_t {
46+
flow_id id;
47+
flow_metrics metrics;
48+
} __attribute__((packed)) flow_record;
4649
#endif

bpf/flows.c

Lines changed: 120 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,33 @@
1+
/*
2+
Flows v2. A Flow-metric generator using TC.
3+
4+
This program can be hooked on to TC ingress/egress hook to monitor packets
5+
to/from an interface.
6+
7+
Logic:
8+
1) Store flow information in a per-cpu hash map.
9+
2) Upon flow completion (tcp->fin event), evict the entry from map, and
10+
send to userspace through ringbuffer.
11+
Eviction for non-tcp flows need to done by userspace
12+
3) When the map is full, we send the new flow entry to userspace via ringbuffer,
13+
until an entry is available.
14+
4) When hash collision is detected, we send the new entry to userpace via ringbuffer.
15+
*/
16+
17+
#include <linux/bpf.h>
18+
#include <linux/in.h>
19+
#include <linux/if_packet.h>
20+
#include <linux/if_vlan.h>
121
#include <linux/ip.h>
22+
#include <linux/if_ether.h>
223
#include <linux/ipv6.h>
3-
#include <linux/in.h>
4-
#include <linux/tcp.h>
24+
#include <linux/icmp.h>
25+
#include <linux/icmpv6.h>
526
#include <linux/udp.h>
6-
#include <linux/bpf.h>
7-
#include <linux/types.h>
27+
#include <linux/tcp.h>
28+
#include <string.h>
29+
30+
#include <stdbool.h>
831
#include <linux/if_ether.h>
932

1033
#include <bpf_helpers.h>
@@ -19,43 +42,51 @@
1942
#define INGRESS 0
2043
#define EGRESS 1
2144

22-
// TODO: for performance reasons, replace the ring buffer by a hashmap and
23-
// aggregate the flows here instead of the Go Accounter
45+
// Common Ringbuffer as a conduit for ingress/egress flows to userspace
2446
struct {
2547
__uint(type, BPF_MAP_TYPE_RINGBUF);
2648
__uint(max_entries, 1 << 24);
27-
} flows SEC(".maps");
49+
} direct_flows SEC(".maps");
50+
51+
// Key: the flow identifier. Value: the flow metrics for that identifier.
52+
// The userspace will aggregate them into a single flow.
53+
struct {
54+
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
55+
__type(key, flow_id);
56+
__type(value, flow_metrics);
57+
} aggregated_flows SEC(".maps");
2858

2959
// Constant definitions, to be overridden by the invoker
3060
volatile const u32 sampling = 0;
3161

3262
const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};
3363

3464
// sets flow fields from IPv4 header information
35-
static inline int fill_iphdr(struct iphdr *ip, void *data_end, struct flow *flow) {
65+
static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id) {
3666
if ((void *)ip + sizeof(*ip) > data_end) {
3767
return DISCARD;
3868
}
3969

40-
__builtin_memcpy(flow->network.src_ip.s6_addr, ip4in6, sizeof(ip4in6));
41-
__builtin_memcpy(flow->network.dst_ip.s6_addr, ip4in6, sizeof(ip4in6));
42-
__builtin_memcpy(flow->network.src_ip.s6_addr + sizeof(ip4in6), &ip->saddr, sizeof(ip->saddr));
43-
__builtin_memcpy(flow->network.dst_ip.s6_addr + sizeof(ip4in6), &ip->daddr, sizeof(ip->daddr));
44-
flow->transport.protocol = ip->protocol;
45-
70+
__builtin_memcpy(id->src_ip.s6_addr, ip4in6, sizeof(ip4in6));
71+
__builtin_memcpy(id->dst_ip.s6_addr, ip4in6, sizeof(ip4in6));
72+
__builtin_memcpy(id->src_ip.s6_addr + sizeof(ip4in6), &ip->saddr, sizeof(ip->saddr));
73+
__builtin_memcpy(id->dst_ip.s6_addr + sizeof(ip4in6), &ip->daddr, sizeof(ip->daddr));
74+
id->transport_protocol = ip->protocol;
75+
id->src_port = 0;
76+
id->dst_port = 0;
4677
switch (ip->protocol) {
4778
case IPPROTO_TCP: {
4879
struct tcphdr *tcp = (void *)ip + sizeof(*ip);
4980
if ((void *)tcp + sizeof(*tcp) <= data_end) {
50-
flow->transport.src_port = __bpf_ntohs(tcp->source);
51-
flow->transport.dst_port = __bpf_ntohs(tcp->dest);
81+
id->src_port = __bpf_ntohs(tcp->source);
82+
id->dst_port = __bpf_ntohs(tcp->dest);
5283
}
5384
} break;
5485
case IPPROTO_UDP: {
5586
struct udphdr *udp = (void *)ip + sizeof(*ip);
5687
if ((void *)udp + sizeof(*udp) <= data_end) {
57-
flow->transport.src_port = __bpf_ntohs(udp->source);
58-
flow->transport.dst_port = __bpf_ntohs(udp->dest);
88+
id->src_port = __bpf_ntohs(udp->source);
89+
id->dst_port = __bpf_ntohs(udp->dest);
5990
}
6091
} break;
6192
default:
@@ -65,28 +96,29 @@ static inline int fill_iphdr(struct iphdr *ip, void *data_end, struct flow *flow
6596
}
6697

6798
// sets flow fields from IPv6 header information
68-
static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, struct flow *flow) {
99+
static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id) {
69100
if ((void *)ip + sizeof(*ip) > data_end) {
70101
return DISCARD;
71102
}
72103

73-
flow->network.src_ip = ip->saddr;
74-
flow->network.dst_ip = ip->daddr;
75-
flow->transport.protocol = ip->nexthdr;
76-
104+
id->src_ip = ip->saddr;
105+
id->dst_ip = ip->daddr;
106+
id->transport_protocol = ip->nexthdr;
107+
id->src_port = 0;
108+
id->dst_port = 0;
77109
switch (ip->nexthdr) {
78110
case IPPROTO_TCP: {
79111
struct tcphdr *tcp = (void *)ip + sizeof(*ip);
80112
if ((void *)tcp + sizeof(*tcp) <= data_end) {
81-
flow->transport.src_port = __bpf_ntohs(tcp->source);
82-
flow->transport.dst_port = __bpf_ntohs(tcp->dest);
113+
id->src_port = __bpf_ntohs(tcp->source);
114+
id->dst_port = __bpf_ntohs(tcp->dest);
83115
}
84116
} break;
85117
case IPPROTO_UDP: {
86118
struct udphdr *udp = (void *)ip + sizeof(*ip);
87119
if ((void *)udp + sizeof(*udp) <= data_end) {
88-
flow->transport.src_port = __bpf_ntohs(udp->source);
89-
flow->transport.dst_port = __bpf_ntohs(udp->dest);
120+
id->src_port = __bpf_ntohs(udp->source);
121+
id->dst_port = __bpf_ntohs(udp->dest);
90122
}
91123
} break;
92124
default:
@@ -95,59 +127,90 @@ static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, struct flow *f
95127
return SUBMIT;
96128
}
97129
// sets flow fields from Ethernet header information
98-
static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, struct flow *flow) {
130+
static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id) {
99131
if ((void *)eth + sizeof(*eth) > data_end) {
100132
return DISCARD;
101133
}
102-
__builtin_memcpy(flow->data_link.dst_mac, eth->h_dest, ETH_ALEN);
103-
__builtin_memcpy(flow->data_link.src_mac, eth->h_source, ETH_ALEN);
104-
flow->protocol = __bpf_ntohs(eth->h_proto);
105-
// TODO: ETH_P_IPV6
106-
if (flow->protocol == ETH_P_IP) {
134+
__builtin_memcpy(id->dst_mac, eth->h_dest, ETH_ALEN);
135+
__builtin_memcpy(id->src_mac, eth->h_source, ETH_ALEN);
136+
id->eth_protocol = __bpf_ntohs(eth->h_proto);
137+
138+
if (id->eth_protocol == ETH_P_IP) {
107139
struct iphdr *ip = (void *)eth + sizeof(*eth);
108-
return fill_iphdr(ip, data_end, flow);
109-
} else if (flow->protocol == ETH_P_IPV6) {
140+
return fill_iphdr(ip, data_end, id);
141+
} else if (id->eth_protocol == ETH_P_IPV6) {
110142
struct ipv6hdr *ip6 = (void *)eth + sizeof(*eth);
111-
return fill_ip6hdr(ip6, data_end, flow);
143+
return fill_ip6hdr(ip6, data_end, id);
144+
} else {
145+
// TODO : Need to implement other specific ethertypes if needed
146+
// For now other parts of flow id remain zero
147+
memset (&(id->src_ip),0, sizeof(struct in6_addr));
148+
memset (&(id->dst_ip),0, sizeof(struct in6_addr));
149+
id->transport_protocol = 0;
150+
id->src_port = 0;
151+
id->dst_port = 0;
112152
}
113153
return SUBMIT;
114154
}
115155

116-
// parses flow information for a given direction (ingress/egress)
117-
static inline int flow_parse(struct __sk_buff *skb, u8 direction) {
118156

157+
static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
119158
// If sampling is defined, will only parse 1 out of "sampling" flows
120159
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
121160
return TC_ACT_OK;
122161
}
123-
124-
void *data = (void *)(long)skb->data;
125162
void *data_end = (void *)(long)skb->data_end;
163+
void *data = (void *)(long)skb->data;
126164

127-
struct flow *flow = bpf_ringbuf_reserve(&flows, sizeof(struct flow), 0);
128-
if (!flow) {
165+
flow_id id;
166+
u64 current_time = bpf_ktime_get_ns();
167+
struct ethhdr *eth = data;
168+
if (fill_ethhdr(eth, data_end, &id) == DISCARD) {
129169
return TC_ACT_OK;
130170
}
171+
id.if_index = skb->ifindex;
172+
id.direction = direction;
131173

132-
struct ethhdr *eth = data;
133-
if (fill_ethhdr(eth, data_end, flow) == DISCARD) {
134-
bpf_ringbuf_discard(flow, 0);
174+
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, &id);
175+
if (aggregate_flow != NULL) {
176+
aggregate_flow->packets += 1;
177+
aggregate_flow->bytes += skb->len;
178+
aggregate_flow->end_mono_time_ts = current_time;
179+
180+
bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_EXIST);
135181
} else {
136-
flow->direction = direction;
137-
flow->bytes = skb->len;
138-
bpf_ringbuf_submit(flow, 0);
182+
// Key does not exist in the map, and will need to create a new entry.
183+
flow_metrics new_flow = {
184+
.packets = 1,
185+
.bytes=skb->len,
186+
.start_mono_time_ts = current_time,
187+
.end_mono_time_ts = current_time,
188+
};
189+
190+
if (bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST) != 0) {
191+
/*
192+
When the map is full, we directly send the flow entry to userspace via ringbuffer,
193+
until space is available in the kernel-side maps
194+
*/
195+
flow_record *record = bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
196+
if (!record) {
197+
return TC_ACT_OK;
198+
}
199+
record->id = id;
200+
record->metrics = new_flow;
201+
bpf_ringbuf_submit(record, 0);
202+
}
139203
}
140204
return TC_ACT_OK;
141-
}
142205

143-
SEC("tc/ingress_flow_parse")
144-
static inline int ingress_flow_parse(struct __sk_buff *skb) {
145-
return flow_parse(skb, INGRESS);
146206
}
147-
148-
SEC("tc/egress_flow_parse")
149-
static inline int egress_flow_parse(struct __sk_buff *skb) {
150-
return flow_parse(skb, EGRESS);
207+
SEC("tc_ingress")
208+
int ingress_flow_parse (struct __sk_buff *skb) {
209+
return flow_monitor(skb, INGRESS);
151210
}
152211

153-
char __license[] SEC("license") = "GPL";
212+
SEC("tc_egress")
213+
int egress_flow_parse (struct __sk_buff *skb) {
214+
return flow_monitor(skb, EGRESS);
215+
}
216+
char _license[] SEC("license") = "GPL";

docs/measurements.pptx

75.9 KB
Binary file not shown.

0 commit comments

Comments
 (0)