- 
                Notifications
    You must be signed in to change notification settings 
- Fork 49
NETOBSERV-2075: Simplifying observed interfaces - moving it to main map #509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
01d7160
              78e1c6c
              a8da666
              a401d0e
              54539c0
              33904f8
              c6d1129
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -6,13 +6,10 @@ | |
| to/from an interface. | ||
|  | ||
| Logic: | ||
| 1) Store flow information in a per-cpu hash map. | ||
| 2) Upon flow completion (tcp->fin event), evict the entry from map, and | ||
| send to userspace through ringbuffer. | ||
| Eviction for non-tcp flows need to done by userspace | ||
| 3) When the map is full, we send the new flow entry to userspace via ringbuffer, | ||
| 1) Store flow information in a hash map. | ||
| 2) Periodically evict the entry from map from userspace. | ||
| 3) When the map is full/busy, we send the new flow entry to userspace via ringbuffer, | ||
| until an entry is available. | ||
| 4) When hash collision is detected, we send the new entry to userpace via ringbuffer. | ||
| */ | ||
| #include <vmlinux.h> | ||
| #include <bpf_helpers.h> | ||
|  | @@ -55,16 +52,60 @@ | |
| */ | ||
| #include "pkt_translation.h" | ||
|  | ||
| static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len, | ||
| u32 sampling) { | ||
| // return 0 on success, 1 if capacity reached | ||
| static __always_inline int add_observed_intf(flow_metrics *value, pkt_info *pkt, u32 if_index, | ||
| u8 direction) { | ||
| if (value->nb_observed_intf >= MAX_OBSERVED_INTERFACES) { | ||
| return 1; | ||
| } | ||
| for (u8 i = 0; i < value->nb_observed_intf; i++) { | ||
| if (value->observed_intf[i] == if_index) { | ||
| if (value->observed_direction[i] != direction && | ||
| value->observed_direction[i] != OBSERVED_DIRECTION_BOTH) { | ||
| // Same interface seen on a different direction => mark as both directions | ||
| value->observed_direction[i] = OBSERVED_DIRECTION_BOTH; | ||
| } | ||
| // Interface already seen -> skip | ||
| return 0; | ||
| } | ||
| } | ||
| value->observed_intf[value->nb_observed_intf] = if_index; | ||
| value->observed_direction[value->nb_observed_intf] = direction; | ||
| value->nb_observed_intf++; | ||
| return 0; | ||
| } | ||
|  | ||
| static __always_inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, | ||
| u64 len, u32 sampling, u32 if_index, | ||
| u8 direction) { | ||
| // Count only packets seen from the same interface as previously to avoid duplicate counts | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think u need to grap the lock here and free after the else and remove it from add_observred_intf to make sure not reading any value while it can be updated from another thead on different core There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not possible, that's what I said here #509 (comment) 
 If we really wanted to, we could work that around, inlining everything and creating temporary copies of values, but I think it would just make it worse. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. even if u use always_inline, the risk I see assume one thread added one interface and inc the count while another thread already looped over the prev counter and we end up overwritting it, is the error compiler err or verification err can u share ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's in the verifier, on startup: => function calls are not allowed while holding a lock So we need to refactor and/or use always_inline ... That's actually tricky because  I mentioned a second error that I saw last time, but I don't reproduce it today, so not sure what it was exactly.... So ... let me try to clean up all that and see how that works... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. u can make add_intf return an error and move the counter outside at the caller to avoid nested functions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, I don't understand why I had an error on reading the value previously, while holding the lock. It doesn't happen this time. => 61cf8a6 | ||
| int maxReached = 0; | ||
| bpf_spin_lock(&aggregate_flow->lock); | ||
| aggregate_flow->packets += 1; | ||
| aggregate_flow->bytes += len; | ||
| aggregate_flow->end_mono_time_ts = pkt->current_ts; | ||
| aggregate_flow->flags |= pkt->flags; | ||
| aggregate_flow->dscp = pkt->dscp; | ||
| aggregate_flow->sampling = sampling; | ||
| if (aggregate_flow->if_index_first_seen == if_index) { | ||
| aggregate_flow->packets += 1; | ||
| aggregate_flow->bytes += len; | ||
| aggregate_flow->end_mono_time_ts = pkt->current_ts; | ||
| aggregate_flow->flags |= pkt->flags; | ||
| aggregate_flow->dscp = pkt->dscp; | ||
| aggregate_flow->sampling = sampling; | ||
| } else if (if_index != 0) { | ||
| // Only add info that we've seen this interface (we can also update end time & flags) | ||
| aggregate_flow->end_mono_time_ts = pkt->current_ts; | ||
| aggregate_flow->flags |= pkt->flags; | ||
| maxReached = add_observed_intf(aggregate_flow, pkt, if_index, direction); | ||
| } | ||
| bpf_spin_unlock(&aggregate_flow->lock); | ||
| if (maxReached > 0) { | ||
| BPF_PRINTK("observed interface missed (array capacity reached); ifindex=%d, eth_type=%d, " | ||
| "proto=%d, sport=%d, dport=%d\n", | ||
| if_index, aggregate_flow->eth_protocol, pkt->id->transport_protocol, | ||
| pkt->id->src_port, pkt->id->dst_port); | ||
| if (pkt->id->transport_protocol != 0 && | ||
|          | ||
| (pkt->id->src_port != 0 || pkt->id->dst_port != 0)) { | ||
| // Only raise counter on non-zero proto/ports; zero proto/ports traffic is very likely to have its interface max count reached | ||
| increase_counter(OBSERVED_INTF_MISSED); | ||
| } | ||
| } | ||
| } | ||
|  | ||
| static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt, int dns_errno) { | ||
|  | @@ -79,23 +120,6 @@ static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt, | |
| } | ||
| } | ||
|  | ||
| static inline void add_observed_intf(additional_metrics *value, u32 if_index, u8 direction) { | ||
| if (value->nb_observed_intf < MAX_OBSERVED_INTERFACES) { | ||
| for (u8 i = 0; i < value->nb_observed_intf; i++) { | ||
| if (value->observed_intf[i].if_index == if_index && | ||
| value->observed_intf[i].direction == direction) { | ||
| return; | ||
| } | ||
| } | ||
| value->observed_intf[value->nb_observed_intf].if_index = if_index; | ||
| value->observed_intf[value->nb_observed_intf].direction = direction; | ||
| value->nb_observed_intf++; | ||
| } else { | ||
| increase_counter(OBSERVED_INTF_MISSED); | ||
| BPF_PRINTK("observed interface missed (array capacity reached) for ifindex %d\n", if_index); | ||
| } | ||
| } | ||
|  | ||
| static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { | ||
| if (!has_filter_sampling) { | ||
| // When no filter sampling is defined, run the sampling check at the earliest for better performances | ||
|  | @@ -151,34 +175,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { | |
| } | ||
| flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); | ||
| if (aggregate_flow != NULL) { | ||
| if (aggregate_flow->if_index_first_seen == skb->ifindex) { | ||
| update_existing_flow(aggregate_flow, &pkt, len, filter_sampling); | ||
| } else if (skb->ifindex != 0) { | ||
| // Only add info that we've seen this interface | ||
| additional_metrics *extra_metrics = | ||
| (additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id); | ||
| if (extra_metrics != NULL) { | ||
| add_observed_intf(extra_metrics, skb->ifindex, direction); | ||
| } else { | ||
| additional_metrics new_metrics = { | ||
| .eth_protocol = eth_protocol, | ||
| .start_mono_time_ts = pkt.current_ts, | ||
| .end_mono_time_ts = pkt.current_ts, | ||
| }; | ||
| add_observed_intf(&new_metrics, skb->ifindex, direction); | ||
| long ret = | ||
| bpf_map_update_elem(&additional_flow_metrics, &id, &new_metrics, BPF_NOEXIST); | ||
| if (ret == -EEXIST) { | ||
| extra_metrics = | ||
| (additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id); | ||
| if (extra_metrics != NULL) { | ||
| add_observed_intf(extra_metrics, skb->ifindex, direction); | ||
| } | ||
| } else if (ret != 0 && trace_messages) { | ||
| bpf_printk("error creating new observed_intf: %d\n", ret); | ||
| } | ||
| } | ||
| } | ||
| update_existing_flow(aggregate_flow, &pkt, len, filter_sampling, skb->ifindex, direction); | ||
| } else { | ||
| // Key does not exist in the map, and will need to create a new entry. | ||
| flow_metrics new_flow; | ||
|  | @@ -205,7 +202,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { | |
| flow_metrics *aggregate_flow = | ||
| (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); | ||
| if (aggregate_flow != NULL) { | ||
| update_existing_flow(aggregate_flow, &pkt, len, filter_sampling); | ||
| update_existing_flow(aggregate_flow, &pkt, len, filter_sampling, skb->ifindex, | ||
| direction); | ||
| } else { | ||
| if (trace_messages) { | ||
| bpf_printk("failed to update an exising flow\n"); | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -66,7 +66,8 @@ typedef __u64 u64; | |
| #define MAX_FILTER_ENTRIES 16 | ||
| #define MAX_EVENT_MD 8 | ||
| #define MAX_NETWORK_EVENTS 4 | ||
| #define MAX_OBSERVED_INTERFACES 4 | ||
| #define MAX_OBSERVED_INTERFACES 6 | ||
| #define OBSERVED_DIRECTION_BOTH 3 | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we have enum used to define possible directions https://github.com/netobserv/netobserv-ebpf-agent/blob/main/bpf/types.h#L75 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't want to change the enum to not mess up anything with the filters, which uses that enum. This value is very specific to the "observed direction" thing | ||
|  | ||
| // according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml | ||
| typedef enum direction_t { | ||
|  | @@ -104,6 +105,9 @@ typedef struct flow_metrics_t { | |
| // https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md | ||
| u8 errno; | ||
| u8 dscp; | ||
| u8 nb_observed_intf; | ||
| u8 observed_direction[MAX_OBSERVED_INTERFACES]; | ||
| u32 observed_intf[MAX_OBSERVED_INTERFACES]; | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why u changed this instead of having array of struct ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's padding optimization. With a struct there was a 3 bytes padding per struct instance, so an extra 18 bytes in total. Separated arrays reduce it to a minimum. | ||
| } flow_metrics; | ||
|  | ||
| // Force emitting enums/structs into the ELF | ||
|  | @@ -134,13 +138,8 @@ typedef struct additional_metrics_t { | |
| u16 dport; | ||
| u16 zone_id; | ||
| } translated_flow; | ||
| struct observed_intf_t { | ||
| u8 direction; | ||
| u32 if_index; | ||
| } observed_intf[MAX_OBSERVED_INTERFACES]; | ||
| u16 eth_protocol; | ||
| u8 network_events_idx; | ||
| u8 nb_observed_intf; | ||
| } additional_metrics; | ||
|  | ||
| // Force emitting enums/structs into the ELF | ||
|  | ||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't this case issues in flp generating the other flp direction field ?
https://github.com/netobserv/flowlogs-pipeline/blob/main/pkg/pipeline/transform/transform_network_direction.go#L32
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FLP doesn't use the interface direction to figure out the node direction, it does that only by comparing agent IP versus src/dest node IP
I don't think there's anything that reads the IfDirections field, except in the console for display...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the impact of using direction both when we filter flows on direction ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no impact precisely because I keep it separated from the direction enum that is used in filters.
The filters don't check anything in the observed_direction array of stored flows. So it's not impacted by this, and will continue to work simply with egress/ingress directions for each packet filtered individually.