Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 57 additions & 60 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@
to/from an interface.

Logic:
1) Store flow information in a per-cpu hash map.
2) Upon flow completion (tcp->fin event), evict the entry from map, and
send to userspace through ringbuffer.
Eviction for non-tcp flows need to done by userspace
3) When the map is full, we send the new flow entry to userspace via ringbuffer,
1) Store flow information in a hash map.
2) Periodically evict the entry from map from userspace.
3) When the map is full/busy, we send the new flow entry to userspace via ringbuffer,
until an entry is available.
4) When hash collision is detected, we send the new entry to userpace via ringbuffer.
*/
#include <vmlinux.h>
#include <bpf_helpers.h>
Expand Down Expand Up @@ -55,16 +52,59 @@
*/
#include "pkt_translation.h"

static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len,
u32 sampling) {
// return 0 on success, 1 if capacity reached
static __always_inline int add_observed_intf(flow_metrics *value, pkt_info *pkt, u32 if_index,
u8 direction) {
if (value->nb_observed_intf >= MAX_OBSERVED_INTERFACES) {
return 1;
}
for (u8 i = 0; i < value->nb_observed_intf; i++) {
if (value->observed_intf[i] == if_index) {
if (value->observed_direction[i] != direction &&
value->observed_direction[i] != OBSERVED_DIRECTION_BOTH) {
// Same interface seen on a different direction => mark as both directions
value->observed_direction[i] = OBSERVED_DIRECTION_BOTH;
Copy link
Contributor

@msherif1234 msherif1234 Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FLP doesn't use the interface direction to figure out the node direction, it does that only by comparing agent IP versus src/dest node IP
I don't think there's anything that reads the IfDirections field, except in the console for display...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the impact of using direction both when we filter flows on direction ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no impact precisely because I keep it separated from the direction enum that is used in filters.
The filters don't check anything in the observed_direction array of stored flows. So it's not impacted by this, and will continue to work simply with egress/ingress directions for each packet filtered individually.

}
// Interface already seen -> skip
return 0;
}
}
value->observed_intf[value->nb_observed_intf] = if_index;
value->observed_direction[value->nb_observed_intf] = direction;
value->nb_observed_intf++;
return 0;
}

static __always_inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt,
u64 len, u32 sampling, u32 if_index,
u8 direction) {
// Count only packets seen from the same interface as previously to avoid duplicate counts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think u need to grap the lock here and free after the else and remove it from add_observred_intf to make sure not reading any value while it can be updated from another thead on different core

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not possible, that's what I said here #509 (comment)

  • when lock is acquired, no call function is allowed. Currently there's two call functions: add_observed_intf and increase_counter
  • reading map value isn't allowed while lock is acquired. For instance, the line if (value->observed_intf[i] == if_index) in add_observed_intf generates an error

If we really wanted to, we could work that around, inlining everything and creating temporary copies of values, but I think it would just make it worse.
The rule of thumb is to keep the lock as little time as possible, which is what I think I'm doing here. If the value is read from another thread before the lock is acquired, I don't think it's a problem, it's just going to get the value before it's updated, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even if u use always_inline, the risk I see assume one thread added one interface and inc the count while another thread already looped over the prev counter and we end up overwritting it, is the error compiler err or verification err can u share ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in the verifier, on startup:
Without inlining:

...ES) {\n\t1528: (25) if r1 > 0x5 goto pc+100 1629: frame1: R1=scalar(umin=6,umax=255,var_off=(0x0; 0xff)) R3=0 R5=1 R6=map_value(id=5,off=48,ks=40,vs=96,imm=0) R7=scalar(umax=4294967295,var_off=(0x0; 0xffffffff)) R8=0 R9=map_value(id=5,off=0,ks=40,vs=96,imm=0) R10=fp0
 fp-32=????mm?? fp-40=00000000 fp-48= fp-56=00000000 fp-64=00000000 fp-72=mmmmmmmm fp-80=fp fp-88=mmmmmmmm fp-96=mmmmmmmm fp-104=mmmmmmmm fp-112=mmmmmmmm fp-120=mmmmmmmm fp-272=ctx fp-280=pkt fp-288=00000000 fp-296= fp-304=1 fp-312=00000000\n\t; bpf_printk(\"couldn't reserve space in the ringbuf. Dropping flow\");\n\t1629: (b7) r1 = 9
                     ; frame1: R1_w=9\n\t1630: (63) *(u32 *)(r10 -264) = r1    ; frame1: R1_w=9 R10=fp0 fp-264=9\n\t1631: (b7) r8 = 1                     ; frame1: R8_w=1\n\t; u32 initVal = 1;\n\t1632: (63) *(u32 *)(r10 -28) = r8     ; frame1: R8_w=1 R10=fp0 fp-32=mmmmmm??\n\t1633: (bf) r2 = r10
                   ; frame1: R2_w=fp0 R10=fp0\n\t;\n\t1634: (07) r2 += -264                 ; frame1: R2_w=fp-264\n\t; error_counter_p = bpf_map_lookup_elem(&global_counters, &key);\n\t1635: (18) r1 = 0xffff8b2008172c00    ; frame1: R1_w=map_ptr(off=0,ks=4,vs=4,imm=0)\n
\t1637: (85) call bpf_map_lookup_elem#1\n\tfunction calls are not allowed while holding a lock\n\tprocessed 482 insns (limit 1000000) max_states_per_insn 2 total_states 29 peak_states 29 mark_read 15" component=ebpf.FlowFetcher
time="2025-01-21T12:34:34Z" level=fatal msg="can't instantiate NetObserv eBPF Agent" error="loading and assigning BPF objects: field TcEgressFlowParse: program tc_egress_flow_parse: load program: invalid argument: function calls are not allowed while holding a lock (280 line(s) omitted)"

=> function calls are not allowed while holding a lock

So we need to refactor and/or use always_inline ... That's actually tricky because add_observed_intf includes increase_counter which contains helper calls (bpf_map_lookup_elem) that cannot be inlined, so in any case that would require some refactoring.

I mentioned a second error that I saw last time, but I don't reproduce it today, so not sure what it was exactly....

So ... let me try to clean up all that and see how that works...

Copy link
Contributor

@msherif1234 msherif1234 Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

u can make add_intf return an error and move the counter outside at the caller to avoid nested functions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I don't understand why I had an error on reading the value previously, while holding the lock. It doesn't happen this time. => 61cf8a6

int maxReached = 0;
bpf_spin_lock(&aggregate_flow->lock);
aggregate_flow->packets += 1;
aggregate_flow->bytes += len;
aggregate_flow->end_mono_time_ts = pkt->current_ts;
aggregate_flow->flags |= pkt->flags;
aggregate_flow->dscp = pkt->dscp;
aggregate_flow->sampling = sampling;
if (aggregate_flow->if_index_first_seen == if_index) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += len;
aggregate_flow->end_mono_time_ts = pkt->current_ts;
aggregate_flow->flags |= pkt->flags;
aggregate_flow->dscp = pkt->dscp;
aggregate_flow->sampling = sampling;
} else if (if_index != 0) {
// Only add info that we've seen this interface (we can also update end time & flags)
aggregate_flow->end_mono_time_ts = pkt->current_ts;
aggregate_flow->flags |= pkt->flags;
maxReached = add_observed_intf(aggregate_flow, pkt, if_index, direction);
}
bpf_spin_unlock(&aggregate_flow->lock);
if (maxReached > 0) {
BPF_PRINTK("observed interface missed (array capacity reached); ifindex=%d, eth_type=%d, "
"proto=%d, sport=%d, dport=%d\n",
if_index, aggregate_flow->eth_protocol, pkt->id->transport_protocol,
pkt->id->src_port, pkt->id->dst_port);
if (pkt->id->transport_protocol != 0) {
// Only raise counter on non-zero proto; zero proto traffic is very likely to have its interface max count reached
increase_counter(OBSERVED_INTF_MISSED);
}
}
}

static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt, int dns_errno) {
Expand All @@ -79,23 +119,6 @@ static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt,
}
}

static inline void add_observed_intf(additional_metrics *value, u32 if_index, u8 direction) {
if (value->nb_observed_intf < MAX_OBSERVED_INTERFACES) {
for (u8 i = 0; i < value->nb_observed_intf; i++) {
if (value->observed_intf[i].if_index == if_index &&
value->observed_intf[i].direction == direction) {
return;
}
}
value->observed_intf[value->nb_observed_intf].if_index = if_index;
value->observed_intf[value->nb_observed_intf].direction = direction;
value->nb_observed_intf++;
} else {
increase_counter(OBSERVED_INTF_MISSED);
BPF_PRINTK("observed interface missed (array capacity reached) for ifindex %d\n", if_index);
}
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
if (!has_filter_sampling) {
// When no filter sampling is defined, run the sampling check at the earliest for better performances
Expand Down Expand Up @@ -151,34 +174,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
if (aggregate_flow->if_index_first_seen == skb->ifindex) {
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
} else if (skb->ifindex != 0) {
// Only add info that we've seen this interface
additional_metrics *extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
add_observed_intf(extra_metrics, skb->ifindex, direction);
} else {
additional_metrics new_metrics = {
.eth_protocol = eth_protocol,
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
};
add_observed_intf(&new_metrics, skb->ifindex, direction);
long ret =
bpf_map_update_elem(&additional_flow_metrics, &id, &new_metrics, BPF_NOEXIST);
if (ret == -EEXIST) {
extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
add_observed_intf(extra_metrics, skb->ifindex, direction);
}
} else if (ret != 0 && trace_messages) {
bpf_printk("error creating new observed_intf: %d\n", ret);
}
}
}
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling, skb->ifindex, direction);
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow;
Expand All @@ -205,7 +201,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
flow_metrics *aggregate_flow =
(flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling, skb->ifindex,
direction);
} else {
if (trace_messages) {
bpf_printk("failed to update an exising flow\n");
Expand Down
11 changes: 5 additions & 6 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ typedef __u64 u64;
#define MAX_FILTER_ENTRIES 16
#define MAX_EVENT_MD 8
#define MAX_NETWORK_EVENTS 4
#define MAX_OBSERVED_INTERFACES 4
#define MAX_OBSERVED_INTERFACES 6
#define OBSERVED_DIRECTION_BOTH 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have enum used to define possible directions https://github.com/netobserv/netobserv-ebpf-agent/blob/main/bpf/types.h#L75
so u can add both there and move MAX to 3

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to change the enum to not mess up anything with the filters, which uses that enum. This value is very specific to the "observed direction" thing


// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum direction_t {
Expand Down Expand Up @@ -104,6 +105,9 @@ typedef struct flow_metrics_t {
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
u8 errno;
u8 dscp;
u8 nb_observed_intf;
u8 observed_direction[MAX_OBSERVED_INTERFACES];
u32 observed_intf[MAX_OBSERVED_INTERFACES];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why u changed this instead of having array of struct ?

Copy link
Member Author

@jotak jotak Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's padding optimization. With a struct there was a 3 bytes padding per struct instance, so an extra 18 bytes in total. Separated arrays reduce it to a minimum.

} flow_metrics;

// Force emitting enums/structs into the ELF
Expand Down Expand Up @@ -134,13 +138,8 @@ typedef struct additional_metrics_t {
u16 dport;
u16 zone_id;
} translated_flow;
struct observed_intf_t {
u8 direction;
u32 if_index;
} observed_intf[MAX_OBSERVED_INTERFACES];
u16 eth_protocol;
u8 network_events_idx;
u8 nb_observed_intf;
} additional_metrics;

// Force emitting enums/structs into the ELF
Expand Down
12 changes: 2 additions & 10 deletions bpf/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, pkt_info *pkt,
if ((void *)eth + sizeof(*eth) > data_end) {
return DISCARD;
}
flow_id *id = pkt->id;
*eth_protocol = bpf_ntohs(eth->h_proto);

if (*eth_protocol == ETH_P_IP) {
Expand All @@ -162,16 +161,9 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, pkt_info *pkt,
} else if (*eth_protocol == ETH_P_IPV6) {
struct ipv6hdr *ip6 = (void *)eth + sizeof(*eth);
return fill_ip6hdr(ip6, data_end, pkt);
} else {
// TODO : Need to implement other specific ethertypes if needed
// For now other parts of flow id remain zero
__builtin_memset(&(id->src_ip), 0, sizeof(struct in6_addr));
__builtin_memset(&(id->dst_ip), 0, sizeof(struct in6_addr));
id->transport_protocol = 0;
id->src_port = 0;
id->dst_port = 0;
}
return SUBMIT;
// Only IP-based flows are managed
return DISCARD;
}

static inline bool is_filter_enabled() {
Expand Down
12 changes: 6 additions & 6 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ func TestFlowsAgent_Decoration(t *testing.T) {
BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000,
IfIndexFirstSeen: 1,
DirectionFirstSeen: 1,
},
AdditionalMetrics: &ebpf.BpfAdditionalMetrics{NbObservedIntf: 1,
ObservedIntf: [model.MaxObservedInterfaces]ebpf.BpfObservedIntfT{{IfIndex: 3, Direction: 0}},
NbObservedIntf: 1,
ObservedIntf: [model.MaxObservedInterfaces]uint32{3},
ObservedDirection: [model.MaxObservedInterfaces]uint8{0},
},
}
metrics2 := model.BpfFlowContent{
BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000,
IfIndexFirstSeen: 4,
DirectionFirstSeen: 0,
},
AdditionalMetrics: &ebpf.BpfAdditionalMetrics{NbObservedIntf: 2,
ObservedIntf: [model.MaxObservedInterfaces]ebpf.BpfObservedIntfT{{IfIndex: 1, Direction: 1}, {IfIndex: 99, Direction: 1}},
NbObservedIntf: 2,
ObservedIntf: [model.MaxObservedInterfaces]uint32{1, 99},
ObservedDirection: [model.MaxObservedInterfaces]uint8{1, 1},
},
}
flows := map[ebpf.BpfFlowId]model.BpfFlowContent{
Expand Down
17 changes: 6 additions & 11 deletions pkg/ebpf/bpf_arm64_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_arm64_bpfel.o
Binary file not shown.
17 changes: 6 additions & 11 deletions pkg/ebpf/bpf_powerpc_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_powerpc_bpfel.o
Binary file not shown.
17 changes: 6 additions & 11 deletions pkg/ebpf/bpf_s390_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_s390_bpfeb.o
Binary file not shown.
Loading
Loading