Skip to content

Commit 45b4757

Browse files
shach33praveingk
andauthored
Added TCP Flags to Flow Record metrics (#68)
* Test commit * Adding TCP flags * Adding TCP flags to record metrics.TODO: Remove log stmts * Updated tcp flags u32-> u16 * Updated indentation in C file * Update flow.h * Indent fixed. Tab -> 4 space fixing indents Indent fixes for flows.c Indent fixes for flows.c-II Still fixing indents * Removed extra file * Added clang-format config to bpf/. To use clang-format -i <file.c> * Fixed message record member sequences * Added TCP Flags combinations Read from flw Updated changes according to comments on PR * Removed log statements. Rebase merges * Fixing merge conflits * Revert the renaming of grpc_test file * Simplify setting flags for v4,v6 * Add flags to ipfix exporter * Update protobuf * Remove type conversions due to conflicts * Remove .gitignore change * Fix indentation * Remove commented line * Fixed typecast errors since move to go 1.18 * Reverting commit d1e9c44. Added changes as separate PR * set flags for v6 Co-authored-by: Pravein Govindan Kannan <[email protected]>
1 parent 0330453 commit 45b4757

File tree

19 files changed

+238
-166
lines changed

19 files changed

+238
-166
lines changed

bpf/.clang-format

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
BasedOnStyle: LLVM,
3+
AllowShortFunctionsOnASingleLine: InlineOnly,
4+
ColumnLimit: 100,
5+
IndentWidth: 4,
6+
SortIncludes: false,
7+
ReflowComments: false,
8+
TabWidth: 4,
9+
}

bpf/flow.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ typedef struct flow_metrics_t {
1717
// as output from bpf_ktime_get_ns()
1818
u64 start_mono_time_ts;
1919
u64 end_mono_time_ts;
20+
// TCP Flags from https://www.ietf.org/rfc/rfc793.txt
21+
u16 flags;
2022
// The positive errno of a failed map insertion that caused a flow
2123
// to be sent via ringbuffer.
2224
// 0 otherwise

bpf/flows.c

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
#include <linux/udp.h>
2727
#include <linux/tcp.h>
2828
#include <string.h>
29-
3029
#include <stdbool.h>
3130
#include <linux/if_ether.h>
3231

@@ -42,6 +41,20 @@
4241
#define INGRESS 0
4342
#define EGRESS 1
4443

44+
// Flags according to RFC 9293 & https://www.iana.org/assignments/ipfix/ipfix.xhtml
45+
#define FIN_FLAG 0x01
46+
#define SYN_FLAG 0x02
47+
#define RST_FLAG 0x04
48+
#define PSH_FLAG 0x08
49+
#define ACK_FLAG 0x10
50+
#define URG_FLAG 0x20
51+
#define ECE_FLAG 0x40
52+
#define CWR_FLAG 0x80
53+
// Custom flags exported
54+
#define SYN_ACK_FLAG 0x100
55+
#define FIN_ACK_FLAG 0x200
56+
#define RST_ACK_FLAG 0x400
57+
4558
// Common Ringbuffer as a conduit for ingress/egress flows to userspace
4659
struct {
4760
__uint(type, BPF_MAP_TYPE_RINGBUF);
@@ -62,8 +75,35 @@ volatile const u8 trace_messages = 0;
6275

6376
const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};
6477

78+
// sets the TCP header flags for connection information
79+
static inline void set_flags(struct tcphdr *th, u16 *flags) {
80+
//If both ACK and SYN are set, then it is server -> client communication during 3-way handshake.
81+
if (th->ack && th->syn) {
82+
*flags |= SYN_ACK_FLAG;
83+
} else if (th->ack && th->fin ) {
84+
// If both ACK and FIN are set, then it is graceful termination from server.
85+
*flags |= FIN_ACK_FLAG;
86+
} else if (th->ack && th->rst ) {
87+
// If both ACK and RST are set, then it is abrupt connection termination.
88+
*flags |= RST_ACK_FLAG;
89+
} else if (th->fin) {
90+
*flags |= FIN_FLAG;
91+
} else if (th->syn) {
92+
*flags |= SYN_FLAG;
93+
} else if (th->rst) {
94+
*flags |= RST_FLAG;
95+
} else if (th->psh) {
96+
*flags |= PSH_FLAG;
97+
} else if (th->urg) {
98+
*flags |= URG_FLAG;
99+
} else if (th->ece) {
100+
*flags |= ECE_FLAG;
101+
} else if (th->cwr) {
102+
*flags |= CWR_FLAG;
103+
}
104+
}
65105
// sets flow fields from IPv4 header information
66-
static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id) {
106+
static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id, u16 *flags) {
67107
if ((void *)ip + sizeof(*ip) > data_end) {
68108
return DISCARD;
69109
}
@@ -81,6 +121,7 @@ static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id) {
81121
if ((void *)tcp + sizeof(*tcp) <= data_end) {
82122
id->src_port = __bpf_ntohs(tcp->source);
83123
id->dst_port = __bpf_ntohs(tcp->dest);
124+
set_flags(tcp, flags);
84125
}
85126
} break;
86127
case IPPROTO_UDP: {
@@ -97,7 +138,7 @@ static inline int fill_iphdr(struct iphdr *ip, void *data_end, flow_id *id) {
97138
}
98139

99140
// sets flow fields from IPv6 header information
100-
static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id) {
141+
static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id, u16 *flags) {
101142
if ((void *)ip + sizeof(*ip) > data_end) {
102143
return DISCARD;
103144
}
@@ -113,6 +154,7 @@ static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id) {
113154
if ((void *)tcp + sizeof(*tcp) <= data_end) {
114155
id->src_port = __bpf_ntohs(tcp->source);
115156
id->dst_port = __bpf_ntohs(tcp->dest);
157+
set_flags(tcp, flags);
116158
}
117159
} break;
118160
case IPPROTO_UDP: {
@@ -128,7 +170,7 @@ static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, flow_id *id) {
128170
return SUBMIT;
129171
}
130172
// sets flow fields from Ethernet header information
131-
static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id) {
173+
static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id, u16 *flags) {
132174
if ((void *)eth + sizeof(*eth) > data_end) {
133175
return DISCARD;
134176
}
@@ -138,23 +180,22 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id) {
138180

139181
if (id->eth_protocol == ETH_P_IP) {
140182
struct iphdr *ip = (void *)eth + sizeof(*eth);
141-
return fill_iphdr(ip, data_end, id);
183+
return fill_iphdr(ip, data_end, id, flags);
142184
} else if (id->eth_protocol == ETH_P_IPV6) {
143185
struct ipv6hdr *ip6 = (void *)eth + sizeof(*eth);
144-
return fill_ip6hdr(ip6, data_end, id);
186+
return fill_ip6hdr(ip6, data_end, id, flags);
145187
} else {
146188
// TODO : Need to implement other specific ethertypes if needed
147189
// For now other parts of flow id remain zero
148-
memset (&(id->src_ip),0, sizeof(struct in6_addr));
149-
memset (&(id->dst_ip),0, sizeof(struct in6_addr));
190+
memset(&(id->src_ip), 0, sizeof(struct in6_addr));
191+
memset(&(id->dst_ip), 0, sizeof(struct in6_addr));
150192
id->transport_protocol = 0;
151193
id->src_port = 0;
152194
id->dst_port = 0;
153195
}
154196
return SUBMIT;
155197
}
156198

157-
158199
static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
159200
// If sampling is defined, will only parse 1 out of "sampling" flows
160201
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
@@ -166,7 +207,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
166207
flow_id id;
167208
u64 current_time = bpf_ktime_get_ns();
168209
struct ethhdr *eth = data;
169-
if (fill_ethhdr(eth, data_end, &id) == DISCARD) {
210+
u16 flags = 0;
211+
if (fill_ethhdr(eth, data_end, &id, &flags) == DISCARD) {
170212
return TC_ACT_OK;
171213
}
172214
id.if_index = skb->ifindex;
@@ -184,7 +226,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
184226
if (aggregate_flow->start_mono_time_ts == 0) {
185227
aggregate_flow->start_mono_time_ts = current_time;
186228
}
187-
229+
aggregate_flow->flags |= flags;
188230
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
189231
if (trace_messages && ret != 0) {
190232
// usually error -16 (-EBUSY) is printed here.
@@ -198,9 +240,10 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
198240
// Key does not exist in the map, and will need to create a new entry.
199241
flow_metrics new_flow = {
200242
.packets = 1,
201-
.bytes=skb->len,
243+
.bytes = skb->len,
202244
.start_mono_time_ts = current_time,
203245
.end_mono_time_ts = current_time,
246+
.flags = flags,
204247
};
205248

206249
// even if we know that the entry is new, another CPU might be concurrently inserting a flow
@@ -230,15 +273,14 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
230273
}
231274
}
232275
return TC_ACT_OK;
233-
234276
}
235277
SEC("tc_ingress")
236-
int ingress_flow_parse (struct __sk_buff *skb) {
278+
int ingress_flow_parse(struct __sk_buff *skb) {
237279
return flow_monitor(skb, INGRESS);
238280
}
239281

240282
SEC("tc_egress")
241-
int egress_flow_parse (struct __sk_buff *skb) {
283+
int egress_flow_parse(struct __sk_buff *skb) {
242284
return flow_monitor(skb, EGRESS);
243285
}
244286
char _license[] SEC("license") = "GPL";

examples/flowlogs-dump/server/flowlogs-dump-collector.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func main() {
6161
log.SetFlags(0)
6262
flag.Parse()
6363

64-
receivedRecords := make(chan *pbflow.Records, 100)
64+
receivedRecords := make(chan *pbflow.Records, 1000)
6565
log.Println("starting flowlogs-dump-collector on port", *port)
6666
go func() {
6767
_, err := grpc.StartCollector(*port, receivedRecords)
@@ -72,7 +72,7 @@ func main() {
7272
for records := range receivedRecords {
7373
for _, record := range records.Entries {
7474
if record.EthProtocol == ipv6 {
75-
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d ends: %v\n",
75+
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d flags:%d ends: %v\n",
7676
ipProto[record.EthProtocol],
7777
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
7878
record.Interface,
@@ -84,10 +84,11 @@ func main() {
8484
record.Direction,
8585
record.Bytes,
8686
record.Packets,
87+
record.Flags,
8788
record.TimeFlowEnd.AsTime().Local().Format("15:04:05.000000"),
8889
)
8990
} else {
90-
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d ends: %v\n",
91+
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s dir:%d bytes:%d packets:%d flags:%d ends: %v\n",
9192
ipProto[record.EthProtocol],
9293
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
9394
record.Interface,
@@ -99,6 +100,7 @@ func main() {
99100
record.Direction,
100101
record.Bytes,
101102
record.Packets,
103+
record.Flags,
102104
record.TimeFlowEnd.AsTime().Local().Format("15:04:05.000000"),
103105
)
104106
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/caarlos0/env/v6 v6.9.1
77
github.com/cilium/ebpf v0.8.1
88
github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424
9+
github.com/golang/protobuf v1.5.2
910
github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f
1011
github.com/netobserv/gopipes v0.3.0
1112
github.com/paulbellamy/ratecounter v0.2.0
@@ -35,7 +36,6 @@ require (
3536
github.com/go-openapi/jsonreference v0.19.5 // indirect
3637
github.com/go-openapi/swag v0.19.14 // indirect
3738
github.com/gogo/protobuf v1.3.2 // indirect
38-
github.com/golang/protobuf v1.5.2 // indirect
3939
github.com/google/gnostic v0.5.7-v3refs // indirect
4040
github.com/google/go-cmp v0.5.7 // indirect
4141
github.com/google/gofuzz v1.1.0 // indirect

pkg/ebpf/bpf_bpfeb.o

3.01 KB
Binary file not shown.

pkg/ebpf/bpf_bpfel.o

3.43 KB
Binary file not shown.

0 commit comments

Comments
 (0)