Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ ebpf-agent.tar
*.pcap
protoc/
release-assets/
perf/

184 changes: 139 additions & 45 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,48 +58,113 @@
#include "ipsec.h"

// return 0 on success, 1 if capacity reached
// Optimized: loop unrolled and early exits for common cases
static __always_inline int add_observed_intf(flow_metrics *value, pkt_info *pkt, u32 if_index,
u8 direction) {
if (value->nb_observed_intf >= MAX_OBSERVED_INTERFACES) {
return 1;
}
for (u8 i = 0; i < value->nb_observed_intf; i++) {
if (value->observed_intf[i] == if_index) {
if (value->observed_direction[i] != direction &&
value->observed_direction[i] != OBSERVED_DIRECTION_BOTH) {
// Same interface seen on a different direction => mark as both directions
value->observed_direction[i] = OBSERVED_DIRECTION_BOTH;
}
// Interface already seen -> skip
return 0;

// Fast path: unroll loop for small array sizes (most common cases)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we must measure that to see how much it improves CPU. The downside I see is that the code is less intuitive / readable, and also it's error prone if we decide to increase MAX_OBSERVED_INTERFACES (we'd need to add new "unrolled" blocks, which can easily be missed)
But optimizations often come with tradeoff so that might be ok, depending on the measured improvement

// Check each position explicitly to eliminate loop overhead
u8 nb = value->nb_observed_intf;

// Unroll for common cases (0-3 interfaces) - most flows see 1-2 interfaces
if (nb == 0) {
// First interface - no check needed
goto add_new;
}

// Check existing interfaces with unrolled comparisons
if (value->observed_intf[0] == if_index) {
if (value->observed_direction[0] != direction &&
value->observed_direction[0] != OBSERVED_DIRECTION_BOTH) {
value->observed_direction[0] = OBSERVED_DIRECTION_BOTH;
}
return 0;
}

if (nb >= 2 && value->observed_intf[1] == if_index) {
if (value->observed_direction[1] != direction &&
value->observed_direction[1] != OBSERVED_DIRECTION_BOTH) {
value->observed_direction[1] = OBSERVED_DIRECTION_BOTH;
}
return 0;
}
value->observed_intf[value->nb_observed_intf] = if_index;
value->observed_direction[value->nb_observed_intf] = direction;
value->nb_observed_intf++;

if (nb >= 3 && value->observed_intf[2] == if_index) {
if (value->observed_direction[2] != direction &&
value->observed_direction[2] != OBSERVED_DIRECTION_BOTH) {
value->observed_direction[2] = OBSERVED_DIRECTION_BOTH;
}
return 0;
}

// Fully unroll remaining cases (positions 3-5) for MAX_OBSERVED_INTERFACES=6
if (nb >= 4 && value->observed_intf[3] == if_index) {
if (value->observed_direction[3] != direction &&
value->observed_direction[3] != OBSERVED_DIRECTION_BOTH) {
value->observed_direction[3] = OBSERVED_DIRECTION_BOTH;
}
return 0;
}

if (nb >= 5 && value->observed_intf[4] == if_index) {
if (value->observed_direction[4] != direction &&
value->observed_direction[4] != OBSERVED_DIRECTION_BOTH) {
value->observed_direction[4] = OBSERVED_DIRECTION_BOTH;
}
return 0;
}

if (nb >= 6 && value->observed_intf[5] == if_index) {
if (value->observed_direction[5] != direction &&
value->observed_direction[5] != OBSERVED_DIRECTION_BOTH) {
value->observed_direction[5] = OBSERVED_DIRECTION_BOTH;
}
return 0;
}

add_new:
// Not found - add new interface
value->observed_intf[nb] = if_index;
value->observed_direction[nb] = direction;
value->nb_observed_intf = nb + 1;
return 0;
}

static __always_inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt,
u64 len, u32 sampling, u32 if_index,
u8 direction) {
// Count only packets seen from the same interface as previously to avoid duplicate counts
// Using lock-free atomic operations for better performance
int maxReached = 0;
bpf_spin_lock(&aggregate_flow->lock);
if (aggregate_flow->if_index_first_seen == if_index) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += len;

// Read if_index_first_seen once (it's never modified after flow creation)
u32 first_seen = aggregate_flow->if_index_first_seen;

if (first_seen == if_index) {
// Common path: same interface - use atomic operations
__sync_fetch_and_add(&aggregate_flow->packets, 1);
__sync_fetch_and_add(&aggregate_flow->bytes, len);
// Timestamp: use simple write (acceptable if slightly out of order, we want latest anyway)
// On architectures that support it, this will be naturally atomic for aligned 64-bit writes
aggregate_flow->end_mono_time_ts = pkt->current_ts;
// Flags is u16 - eBPF doesn't support atomic ops on 16-bit types
// Use simple write: OR is idempotent, so worst case is missing a flag bit in rare races (acceptable)
aggregate_flow->flags |= pkt->flags;
// DSCP and sampling: simple writes (these are infrequently updated, races are acceptable)
aggregate_flow->dscp = pkt->dscp;
aggregate_flow->sampling = sampling;
} else if (if_index != 0) {
// Only add info that we've seen this interface (we can also update end time & flags)
// Different interface path: update timestamps/flags atomically, then add interface
aggregate_flow->end_mono_time_ts = pkt->current_ts;
// Flags update - use simple write (OR is idempotent, occasional missed flag is acceptable)
aggregate_flow->flags |= pkt->flags;
// Note: add_observed_intf may have races, but worst case is missing one interface entry
// This is acceptable since interface tracking is best-effort metadata
maxReached = add_observed_intf(aggregate_flow, pkt, if_index, direction);
}
bpf_spin_unlock(&aggregate_flow->lock);
if (maxReached > 0) {
BPF_PRINTK("observed interface missed (array capacity reached); ifindex=%d, eth_type=%d, "
"proto=%d, sport=%d, dport=%d\n",
Expand Down Expand Up @@ -138,25 +203,50 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}

u16 eth_protocol = 0;
// Initialize pkt_info with only needed fields - compiler zeros the rest
pkt_info pkt;
__builtin_memset(&pkt, 0, sizeof(pkt));
pkt.current_ts = bpf_ktime_get_ns(); // Record the current time first.
pkt.id = NULL; // Will be set below
pkt.flags = 0;
pkt.l4_hdr = NULL;
pkt.dscp = 0;
pkt.dns_id = 0;
pkt.dns_flags = 0;
pkt.dns_latency = 0;
// DNS name only initialized if DNS tracking enabled (set by track_dns_packet if needed)

flow_id id;
__builtin_memset(&id, 0, sizeof(id));
flow_id id = {0}; // All fields zeroed - needed for flow identification

pkt.current_ts = bpf_ktime_get_ns(); // Record the current time first.
pkt.id = &id;

void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;
struct ethhdr *eth = (struct ethhdr *)data;
u64 len = skb->len;
u8 protocol = 0; // Will be set by L3 parsing

if (fill_ethhdr(eth, data_end, &pkt, &eth_protocol) == DISCARD) {
// Optimized: Parse L2+L3 first for early IP filtering
// This allows us to skip L4 parsing if IP-based filtering rejects the packet
if (fill_ethhdr_l3only(eth, data_end, &pkt, &eth_protocol, &protocol) == DISCARD) {
return TC_ACT_OK;
}

// check if this packet need to be filtered if filtering feature is enabled
// Early IP filtering: check if we can reject before parsing L4
// This saves L4 parsing for packets that will be rejected anyway
bool filter_enabled = is_filter_enabled();
if (filter_enabled) {
filter_action early_action = MAX_FILTER_ACTIONS;
if (early_ip_filter_check(&id, &early_action, eth_protocol, direction)) {
// Early rejection - skip L4 parsing entirely
if (early_action == REJECT) {
return TC_ACT_OK;
}
}
}
// Parse L4 (needed for full filtering or flow tracking)
parse_l4_after_l3(eth, data_end, &pkt, eth_protocol, protocol);

// Full filter check (now that L4 is parsed if needed)
bool skip =
check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &flow_sampling, direction);
if (has_filter_sampling) {
Expand All @@ -183,18 +273,20 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
update_existing_flow(aggregate_flow, &pkt, len, flow_sampling, skb->ifindex, direction);
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow;
__builtin_memset(&new_flow, 0, sizeof(new_flow));
new_flow.if_index_first_seen = skb->ifindex;
new_flow.direction_first_seen = direction;
new_flow.packets = 1;
new_flow.bytes = len;
new_flow.eth_protocol = eth_protocol;
new_flow.start_mono_time_ts = pkt.current_ts;
new_flow.end_mono_time_ts = pkt.current_ts;
new_flow.flags = pkt.flags;
new_flow.dscp = pkt.dscp;
new_flow.sampling = flow_sampling;
// Initialize only the fields we need - compiler will zero the rest
flow_metrics new_flow = {
.if_index_first_seen = skb->ifindex,
.direction_first_seen = direction,
.packets = 1,
.bytes = len,
.eth_protocol = eth_protocol,
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.dscp = pkt.dscp,
.sampling = flow_sampling,
.nb_observed_intf = 0 // Explicitly zero for clarity
};
Comment on lines +277 to +289
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we used to do that previously, and switched to individual assignments, iirc @msherif1234 found cases where that didn't work as intended, but can't remember what exactly. @msherif1234 do you remember?

__builtin_memcpy(new_flow.dst_mac, eth->h_dest, ETH_ALEN);
__builtin_memcpy(new_flow.src_mac, eth->h_source, ETH_ALEN);

Expand Down Expand Up @@ -245,15 +337,17 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
if (extra_metrics != NULL) {
update_dns(extra_metrics, &pkt, dns_errno);
} else {
additional_metrics new_metrics;
__builtin_memset(&new_metrics, 0, sizeof(new_metrics));
new_metrics.start_mono_time_ts = pkt.current_ts;
new_metrics.end_mono_time_ts = pkt.current_ts;
new_metrics.eth_protocol = eth_protocol;
new_metrics.dns_record.id = pkt.dns_id;
new_metrics.dns_record.flags = pkt.dns_flags;
new_metrics.dns_record.latency = pkt.dns_latency;
new_metrics.dns_record.errno = dns_errno;
// Initialize only needed fields - compiler will zero the rest
additional_metrics new_metrics = {
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.eth_protocol = eth_protocol,
.dns_record = {.id = pkt.dns_id,
.flags = pkt.dns_flags,
.latency = pkt.dns_latency,
.errno = dns_errno},
.network_events_idx = 0 // Explicitly zero for clarity
};
long ret =
bpf_map_update_elem(&additional_flow_metrics, &id, &new_metrics, BPF_NOEXIST);
if (ret != 0) {
Expand Down
60 changes: 58 additions & 2 deletions bpf/flows_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_

if (rule->do_peerCIDR_lookup) {
struct filter_key_t peerKey;
__builtin_memset(&peerKey, 0, sizeof(peerKey));
// No need to memset - flow_filter_setup_lookup_key will initialize all fields we use
// PeerCIDR lookup will will target the opposite IP compared to original CIDR lookup
// In other words if cidr is using srcIP then peerCIDR will be the dstIP
if (flow_filter_setup_lookup_key(id, &peerKey, &len, &offset, use_src_ip,
Expand Down Expand Up @@ -218,6 +218,62 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_
return result;
}

/*
* Early IP-only filter check - optimized to skip L4 parsing if IP-based rejection is possible.
* Returns: 1 if packet can be rejected early (IP-only reject rule), 0 if needs full check
* This is a fast path that only checks CIDR matching, not ports/protocols.
*/
static __always_inline int early_ip_filter_check(flow_id *id, filter_action *action,
u16 eth_protocol, u8 direction) {
struct filter_key_t key;
u8 len, offset;
struct filter_value_t *rule;

// Check srcIP CIDR match first
if (flow_filter_setup_lookup_key(id, &key, &len, &offset, true, eth_protocol) < 0) {
return 0; // Need full check
}

rule = (struct filter_value_t *)bpf_map_lookup_elem(&filter_map, &key);
if (rule && rule->action == REJECT) {
// IP matches and action is REJECT - can reject early without checking ports/protocols
// Note: We check direction if rule specifies it
if (rule->direction == MAX_DIRECTION || rule->direction == direction) {
// If rule has port/protocol checks, we can't reject early (would need L4)
// But if it's IP-only (protocol==0, no ports), we can reject now
if (rule->protocol == 0 && rule->dstPortStart == 0 && rule->srcPortStart == 0 &&
rule->portStart == 0 && rule->dstPort1 == 0 && rule->srcPort1 == 0 &&
rule->port1 == 0 && rule->dstPort2 == 0 && rule->srcPort2 == 0 &&
rule->port2 == 0 && !rule->do_peerCIDR_lookup && rule->tcpFlags == 0 &&
rule->icmpType == 0 && rule->filter_drops == 0) {
*action = REJECT;
return 1; // Can reject early
}
}
}

// Check dstIP CIDR match
if (flow_filter_setup_lookup_key(id, &key, &len, &offset, false, eth_protocol) < 0) {
return 0; // Need full check
}

rule = (struct filter_value_t *)bpf_map_lookup_elem(&filter_map, &key);
if (rule && rule->action == REJECT) {
if (rule->direction == MAX_DIRECTION || rule->direction == direction) {
if (rule->protocol == 0 && rule->dstPortStart == 0 && rule->srcPortStart == 0 &&
rule->portStart == 0 && rule->dstPort1 == 0 && rule->srcPort1 == 0 &&
rule->port1 == 0 && rule->dstPort2 == 0 && rule->srcPort2 == 0 &&
rule->port2 == 0 && !rule->do_peerCIDR_lookup && rule->tcpFlags == 0 &&
rule->icmpType == 0 && rule->filter_drops == 0) {
*action = REJECT;
return 1; // Can reject early
}
}
}

return 0; // Need full check with L4
}

/*
* check if the flow match filter rule and return >= 1 if the flow is to be dropped
*/
Expand All @@ -228,7 +284,7 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
u8 len, offset;
int result = 0;

__builtin_memset(&key, 0, sizeof(key));
// No need to memset - flow_filter_setup_lookup_key will initialize all fields we use
*action = MAX_FILTER_ACTIONS;

// Lets do first CIDR match using srcIP.
Expand Down
17 changes: 13 additions & 4 deletions bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,20 @@

#include "utils.h"

// Optimized: unroll loop for small array (MAX_NETWORK_EVENTS=4)
static inline bool md_already_exists(u8 network_events[MAX_NETWORK_EVENTS][MAX_EVENT_MD], u8 *md) {
for (u8 i = 0; i < MAX_NETWORK_EVENTS; i++) {
if (__builtin_memcmp(network_events[i], md, MAX_EVENT_MD) == 0) {
return true;
}
// Unroll comparisons for common case - most flows have 1-2 events
if (__builtin_memcmp(network_events[0], md, MAX_EVENT_MD) == 0) {
return true;
}
if (__builtin_memcmp(network_events[1], md, MAX_EVENT_MD) == 0) {
return true;
}
if (__builtin_memcmp(network_events[2], md, MAX_EVENT_MD) == 0) {
return true;
}
if (__builtin_memcmp(network_events[3], md, MAX_EVENT_MD) == 0) {
return true;
}
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ typedef struct flow_metrics_t {
u8 dst_mac[ETH_ALEN];
// OS interface index
u32 if_index_first_seen;
struct bpf_spin_lock lock;
// Lock removed - using lock-free atomic operations for better performance
u32 sampling;
u8 direction_first_seen;
// The positive errno of a failed map insertion that caused a flow
Expand Down
Loading