diff --git a/bpf/flows.c b/bpf/flows.c index 9eb67b7d6..7294df318 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -6,13 +6,10 @@ to/from an interface. Logic: - 1) Store flow information in a per-cpu hash map. - 2) Upon flow completion (tcp->fin event), evict the entry from map, and - send to userspace through ringbuffer. - Eviction for non-tcp flows need to done by userspace - 3) When the map is full, we send the new flow entry to userspace via ringbuffer, + 1) Store flow information in a hash map. + 2) Periodically evict the entry from map from userspace. + 3) When the map is full/busy, we send the new flow entry to userspace via ringbuffer, until an entry is available. - 4) When hash collision is detected, we send the new entry to userpace via ringbuffer. */ #include #include @@ -55,16 +52,59 @@ */ #include "pkt_translation.h" -static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len, - u32 sampling) { +// return 0 on success, 1 if capacity reached +static __always_inline int add_observed_intf(flow_metrics *value, pkt_info *pkt, u32 if_index, + u8 direction) { + if (value->nb_observed_intf >= MAX_OBSERVED_INTERFACES) { + return 1; + } + for (u8 i = 0; i < value->nb_observed_intf; i++) { + if (value->observed_intf[i] == if_index) { + if (value->observed_direction[i] != direction && + value->observed_direction[i] != OBSERVED_DIRECTION_BOTH) { + // Same interface seen on a different direction => mark as both directions + value->observed_direction[i] = OBSERVED_DIRECTION_BOTH; + } + // Interface already seen -> skip + return 0; + } + } + value->observed_intf[value->nb_observed_intf] = if_index; + value->observed_direction[value->nb_observed_intf] = direction; + value->nb_observed_intf++; + return 0; +} + +static __always_inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, + u64 len, u32 sampling, u32 if_index, + u8 direction) { + // Count only packets seen from the same interface as previously to avoid duplicate counts + int maxReached = 0; bpf_spin_lock(&aggregate_flow->lock); - aggregate_flow->packets += 1; - aggregate_flow->bytes += len; - aggregate_flow->end_mono_time_ts = pkt->current_ts; - aggregate_flow->flags |= pkt->flags; - aggregate_flow->dscp = pkt->dscp; - aggregate_flow->sampling = sampling; + if (aggregate_flow->if_index_first_seen == if_index) { + aggregate_flow->packets += 1; + aggregate_flow->bytes += len; + aggregate_flow->end_mono_time_ts = pkt->current_ts; + aggregate_flow->flags |= pkt->flags; + aggregate_flow->dscp = pkt->dscp; + aggregate_flow->sampling = sampling; + } else if (if_index != 0) { + // Only add info that we've seen this interface (we can also update end time & flags) + aggregate_flow->end_mono_time_ts = pkt->current_ts; + aggregate_flow->flags |= pkt->flags; + maxReached = add_observed_intf(aggregate_flow, pkt, if_index, direction); + } bpf_spin_unlock(&aggregate_flow->lock); + if (maxReached > 0) { + BPF_PRINTK("observed interface missed (array capacity reached); ifindex=%d, eth_type=%d, " + "proto=%d, sport=%d, dport=%d\n", + if_index, aggregate_flow->eth_protocol, pkt->id->transport_protocol, + pkt->id->src_port, pkt->id->dst_port); + if (pkt->id->transport_protocol != 0) { + // Only raise counter on non-zero proto; zero proto traffic is very likely to have its interface max count reached + increase_counter(OBSERVED_INTF_MISSED); + } + } } static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt, int dns_errno) { @@ -79,23 +119,6 @@ static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt, } } -static inline void add_observed_intf(additional_metrics *value, u32 if_index, u8 direction) { - if (value->nb_observed_intf < MAX_OBSERVED_INTERFACES) { - for (u8 i = 0; i < value->nb_observed_intf; i++) { - if (value->observed_intf[i].if_index == if_index && - value->observed_intf[i].direction == direction) { - return; - } - } - value->observed_intf[value->nb_observed_intf].if_index = if_index; - value->observed_intf[value->nb_observed_intf].direction = direction; - value->nb_observed_intf++; - } else { - increase_counter(OBSERVED_INTF_MISSED); - BPF_PRINTK("observed interface missed (array capacity reached) for ifindex %d\n", if_index); - } -} - static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { if (!has_filter_sampling) { // When no filter sampling is defined, run the sampling check at the earliest for better performances @@ -151,34 +174,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { } flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); if (aggregate_flow != NULL) { - if (aggregate_flow->if_index_first_seen == skb->ifindex) { - update_existing_flow(aggregate_flow, &pkt, len, filter_sampling); - } else if (skb->ifindex != 0) { - // Only add info that we've seen this interface - additional_metrics *extra_metrics = - (additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id); - if (extra_metrics != NULL) { - add_observed_intf(extra_metrics, skb->ifindex, direction); - } else { - additional_metrics new_metrics = { - .eth_protocol = eth_protocol, - .start_mono_time_ts = pkt.current_ts, - .end_mono_time_ts = pkt.current_ts, - }; - add_observed_intf(&new_metrics, skb->ifindex, direction); - long ret = - bpf_map_update_elem(&additional_flow_metrics, &id, &new_metrics, BPF_NOEXIST); - if (ret == -EEXIST) { - extra_metrics = - (additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id); - if (extra_metrics != NULL) { - add_observed_intf(extra_metrics, skb->ifindex, direction); - } - } else if (ret != 0 && trace_messages) { - bpf_printk("error creating new observed_intf: %d\n", ret); - } - } - } + update_existing_flow(aggregate_flow, &pkt, len, filter_sampling, skb->ifindex, direction); } else { // Key does not exist in the map, and will need to create a new entry. flow_metrics new_flow; @@ -205,7 +201,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); if (aggregate_flow != NULL) { - update_existing_flow(aggregate_flow, &pkt, len, filter_sampling); + update_existing_flow(aggregate_flow, &pkt, len, filter_sampling, skb->ifindex, + direction); } else { if (trace_messages) { bpf_printk("failed to update an exising flow\n"); diff --git a/bpf/types.h b/bpf/types.h index 90f91d79f..f5cd7fba5 100644 --- a/bpf/types.h +++ b/bpf/types.h @@ -66,7 +66,8 @@ typedef __u64 u64; #define MAX_FILTER_ENTRIES 16 #define MAX_EVENT_MD 8 #define MAX_NETWORK_EVENTS 4 -#define MAX_OBSERVED_INTERFACES 4 +#define MAX_OBSERVED_INTERFACES 6 +#define OBSERVED_DIRECTION_BOTH 3 // according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml typedef enum direction_t { @@ -104,6 +105,9 @@ typedef struct flow_metrics_t { // https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md u8 errno; u8 dscp; + u8 nb_observed_intf; + u8 observed_direction[MAX_OBSERVED_INTERFACES]; + u32 observed_intf[MAX_OBSERVED_INTERFACES]; } flow_metrics; // Force emitting enums/structs into the ELF @@ -134,13 +138,8 @@ typedef struct additional_metrics_t { u16 dport; u16 zone_id; } translated_flow; - struct observed_intf_t { - u8 direction; - u32 if_index; - } observed_intf[MAX_OBSERVED_INTERFACES]; u16 eth_protocol; u8 network_events_idx; - u8 nb_observed_intf; } additional_metrics; // Force emitting enums/structs into the ELF diff --git a/bpf/utils.h b/bpf/utils.h index 2e6034b35..75657633b 100644 --- a/bpf/utils.h +++ b/bpf/utils.h @@ -153,7 +153,6 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, pkt_info *pkt, if ((void *)eth + sizeof(*eth) > data_end) { return DISCARD; } - flow_id *id = pkt->id; *eth_protocol = bpf_ntohs(eth->h_proto); if (*eth_protocol == ETH_P_IP) { @@ -162,16 +161,9 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, pkt_info *pkt, } else if (*eth_protocol == ETH_P_IPV6) { struct ipv6hdr *ip6 = (void *)eth + sizeof(*eth); return fill_ip6hdr(ip6, data_end, pkt); - } else { - // TODO : Need to implement other specific ethertypes if needed - // For now other parts of flow id remain zero - __builtin_memset(&(id->src_ip), 0, sizeof(struct in6_addr)); - __builtin_memset(&(id->dst_ip), 0, sizeof(struct in6_addr)); - id->transport_protocol = 0; - id->src_port = 0; - id->dst_port = 0; } - return SUBMIT; + // Only IP-based flows are managed + return DISCARD; } static inline bool is_filter_enabled() { diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 5c887ffdb..0327db32c 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -65,18 +65,18 @@ func TestFlowsAgent_Decoration(t *testing.T) { BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000, IfIndexFirstSeen: 1, DirectionFirstSeen: 1, - }, - AdditionalMetrics: &ebpf.BpfAdditionalMetrics{NbObservedIntf: 1, - ObservedIntf: [model.MaxObservedInterfaces]ebpf.BpfObservedIntfT{{IfIndex: 3, Direction: 0}}, + NbObservedIntf: 1, + ObservedIntf: [model.MaxObservedInterfaces]uint32{3}, + ObservedDirection: [model.MaxObservedInterfaces]uint8{0}, }, } metrics2 := model.BpfFlowContent{ BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000, IfIndexFirstSeen: 4, DirectionFirstSeen: 0, - }, - AdditionalMetrics: &ebpf.BpfAdditionalMetrics{NbObservedIntf: 2, - ObservedIntf: [model.MaxObservedInterfaces]ebpf.BpfObservedIntfT{{IfIndex: 1, Direction: 1}, {IfIndex: 99, Direction: 1}}, + NbObservedIntf: 2, + ObservedIntf: [model.MaxObservedInterfaces]uint32{1, 99}, + ObservedDirection: [model.MaxObservedInterfaces]uint8{1, 1}, }, } flows := map[ebpf.BpfFlowId]model.BpfFlowContent{ diff --git a/pkg/ebpf/bpf_arm64_bpfel.go b/pkg/ebpf/bpf_arm64_bpfel.go index 019c122b4..9dbadab13 100644 --- a/pkg/ebpf/bpf_arm64_bpfel.go +++ b/pkg/ebpf/bpf_arm64_bpfel.go @@ -20,12 +20,9 @@ type BpfAdditionalMetrics struct { FlowRtt uint64 NetworkEvents [4][8]uint8 TranslatedFlow BpfTranslatedFlowT - _ [2]byte - ObservedIntf [4]BpfObservedIntfT EthProtocol uint16 NetworkEventsIdx uint8 - NbObservedIntf uint8 - _ [4]byte + _ [7]byte } type BpfDirectionT uint32 @@ -124,7 +121,11 @@ type BpfFlowMetricsT struct { DirectionFirstSeen uint8 Errno uint8 Dscp uint8 - _ [5]byte + NbObservedIntf uint8 + ObservedDirection [6]uint8 + _ [2]byte + ObservedIntf [6]uint32 + _ [4]byte } type BpfFlowRecordT struct { @@ -148,12 +149,6 @@ const ( BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 10 ) -type BpfObservedIntfT struct { - Direction uint8 - _ [3]byte - IfIndex uint32 -} - type BpfPktDropsT struct { Bytes uint64 Packets uint32 diff --git a/pkg/ebpf/bpf_arm64_bpfel.o b/pkg/ebpf/bpf_arm64_bpfel.o index ea9d91d42..8f6bce506 100644 Binary files a/pkg/ebpf/bpf_arm64_bpfel.o and b/pkg/ebpf/bpf_arm64_bpfel.o differ diff --git a/pkg/ebpf/bpf_powerpc_bpfel.go b/pkg/ebpf/bpf_powerpc_bpfel.go index e989ff4d0..55657a4b5 100644 --- a/pkg/ebpf/bpf_powerpc_bpfel.go +++ b/pkg/ebpf/bpf_powerpc_bpfel.go @@ -20,12 +20,9 @@ type BpfAdditionalMetrics struct { FlowRtt uint64 NetworkEvents [4][8]uint8 TranslatedFlow BpfTranslatedFlowT - _ [2]byte - ObservedIntf [4]BpfObservedIntfT EthProtocol uint16 NetworkEventsIdx uint8 - NbObservedIntf uint8 - _ [4]byte + _ [7]byte } type BpfDirectionT uint32 @@ -124,7 +121,11 @@ type BpfFlowMetricsT struct { DirectionFirstSeen uint8 Errno uint8 Dscp uint8 - _ [5]byte + NbObservedIntf uint8 + ObservedDirection [6]uint8 + _ [2]byte + ObservedIntf [6]uint32 + _ [4]byte } type BpfFlowRecordT struct { @@ -148,12 +149,6 @@ const ( BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 10 ) -type BpfObservedIntfT struct { - Direction uint8 - _ [3]byte - IfIndex uint32 -} - type BpfPktDropsT struct { Bytes uint64 Packets uint32 diff --git a/pkg/ebpf/bpf_powerpc_bpfel.o b/pkg/ebpf/bpf_powerpc_bpfel.o index 77cb37b00..3a4d1c0b5 100644 Binary files a/pkg/ebpf/bpf_powerpc_bpfel.o and b/pkg/ebpf/bpf_powerpc_bpfel.o differ diff --git a/pkg/ebpf/bpf_s390_bpfeb.go b/pkg/ebpf/bpf_s390_bpfeb.go index 1b0b501f6..370a8f5e4 100644 --- a/pkg/ebpf/bpf_s390_bpfeb.go +++ b/pkg/ebpf/bpf_s390_bpfeb.go @@ -20,12 +20,9 @@ type BpfAdditionalMetrics struct { FlowRtt uint64 NetworkEvents [4][8]uint8 TranslatedFlow BpfTranslatedFlowT - _ [2]byte - ObservedIntf [4]BpfObservedIntfT EthProtocol uint16 NetworkEventsIdx uint8 - NbObservedIntf uint8 - _ [4]byte + _ [7]byte } type BpfDirectionT uint32 @@ -124,7 +121,11 @@ type BpfFlowMetricsT struct { DirectionFirstSeen uint8 Errno uint8 Dscp uint8 - _ [5]byte + NbObservedIntf uint8 + ObservedDirection [6]uint8 + _ [2]byte + ObservedIntf [6]uint32 + _ [4]byte } type BpfFlowRecordT struct { @@ -148,12 +149,6 @@ const ( BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 10 ) -type BpfObservedIntfT struct { - Direction uint8 - _ [3]byte - IfIndex uint32 -} - type BpfPktDropsT struct { Bytes uint64 Packets uint32 diff --git a/pkg/ebpf/bpf_s390_bpfeb.o b/pkg/ebpf/bpf_s390_bpfeb.o index 315530539..0a211f109 100644 Binary files a/pkg/ebpf/bpf_s390_bpfeb.o and b/pkg/ebpf/bpf_s390_bpfeb.o differ diff --git a/pkg/ebpf/bpf_x86_bpfel.go b/pkg/ebpf/bpf_x86_bpfel.go index 3fe711208..690d8d922 100644 --- a/pkg/ebpf/bpf_x86_bpfel.go +++ b/pkg/ebpf/bpf_x86_bpfel.go @@ -20,12 +20,9 @@ type BpfAdditionalMetrics struct { FlowRtt uint64 NetworkEvents [4][8]uint8 TranslatedFlow BpfTranslatedFlowT - _ [2]byte - ObservedIntf [4]BpfObservedIntfT EthProtocol uint16 NetworkEventsIdx uint8 - NbObservedIntf uint8 - _ [4]byte + _ [7]byte } type BpfDirectionT uint32 @@ -124,7 +121,11 @@ type BpfFlowMetricsT struct { DirectionFirstSeen uint8 Errno uint8 Dscp uint8 - _ [5]byte + NbObservedIntf uint8 + ObservedDirection [6]uint8 + _ [2]byte + ObservedIntf [6]uint32 + _ [4]byte } type BpfFlowRecordT struct { @@ -148,12 +149,6 @@ const ( BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 10 ) -type BpfObservedIntfT struct { - Direction uint8 - _ [3]byte - IfIndex uint32 -} - type BpfPktDropsT struct { Bytes uint64 Packets uint32 diff --git a/pkg/ebpf/bpf_x86_bpfel.o b/pkg/ebpf/bpf_x86_bpfel.o index aa3039e88..037d1ce24 100644 Binary files a/pkg/ebpf/bpf_x86_bpfel.o and b/pkg/ebpf/bpf_x86_bpfel.o differ diff --git a/pkg/ebpf/gen.go b/pkg/ebpf/gen.go index a95d9ba48..3ec45b49c 100644 --- a/pkg/ebpf/gen.go +++ b/pkg/ebpf/gen.go @@ -1,4 +1,4 @@ package ebpf // $BPF_CLANG and $BPF_CFLAGS are set by the Makefile. -//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64,ppc64le,s390x -type flow_metrics_t -type flow_id_t -type flow_record_t -type pkt_drops_t -type dns_record_t -type global_counters_key_t -type direction_t -type filter_action_t -type tcp_flags_t -type translated_flow_t -type observed_intf_t Bpf ../../bpf/flows.c -- -I../../bpf/headers +//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64,ppc64le,s390x -type flow_metrics_t -type flow_id_t -type flow_record_t -type pkt_drops_t -type dns_record_t -type global_counters_key_t -type direction_t -type filter_action_t -type tcp_flags_t -type translated_flow_t Bpf ../../bpf/flows.c -- -I../../bpf/headers diff --git a/pkg/exporter/grpc_proto.go b/pkg/exporter/grpc_proto.go index 9680a95ba..cec5e2a11 100644 --- a/pkg/exporter/grpc_proto.go +++ b/pkg/exporter/grpc_proto.go @@ -57,7 +57,7 @@ func (g *GRPCProto) ExportFlows(input <-chan []*model.Record) { for _, pbRecords := range pbflow.FlowsToPB(inputRecords, g.maxFlowsPerMessage) { log.Debugf("sending %d records", len(pbRecords.Entries)) if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil { - g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage").Inc() + g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage", metrics.HighSeverity).Inc() log.WithError(err).Error("couldn't send flow records to collector") } g.batchCounter.Inc() @@ -66,6 +66,6 @@ func (g *GRPCProto) ExportFlows(input <-chan []*model.Record) { } if err := g.clientConn.Close(); err != nil { log.WithError(err).Warn("couldn't close flow export client") - g.metrics.Errors.WithErrorName(componentGRPC, "CannotCloseClient").Inc() + g.metrics.Errors.WithErrorName(componentGRPC, "CannotCloseClient", metrics.MediumSeverity).Inc() } } diff --git a/pkg/exporter/kafka_proto.go b/pkg/exporter/kafka_proto.go index 169562c57..95ee3c378 100644 --- a/pkg/exporter/kafka_proto.go +++ b/pkg/exporter/kafka_proto.go @@ -53,7 +53,7 @@ func (kp *KafkaProto) batchAndSubmit(records []*model.Record) { pbBytes, err := proto.Marshal(pbflow.FlowToPB(record)) if err != nil { klog.WithError(err).Debug("can't encode protobuf message. Ignoring") - kp.Metrics.Errors.WithErrorName(componentKafka, "CannotEncodeMessage").Inc() + kp.Metrics.Errors.WithErrorName(componentKafka, "CannotEncodeMessage", metrics.HighSeverity).Inc() continue } msgs = append(msgs, kafkago.Message{Value: pbBytes, Key: getFlowKey(record)}) @@ -61,7 +61,7 @@ func (kp *KafkaProto) batchAndSubmit(records []*model.Record) { if err := kp.Writer.WriteMessages(context.TODO(), msgs...); err != nil { klog.WithError(err).Error("can't write messages into Kafka") - kp.Metrics.Errors.WithErrorName(componentKafka, "CannotWriteMessage").Inc() + kp.Metrics.Errors.WithErrorName(componentKafka, "CannotWriteMessage", metrics.HighSeverity).Inc() } kp.Metrics.EvictionCounter.WithSource(componentKafka).Inc() kp.Metrics.EvictedFlowsCounter.WithSource(componentKafka).Add(float64(len(records))) diff --git a/pkg/flow/tracer_ringbuf.go b/pkg/flow/tracer_ringbuf.go index a92f7d38d..fb61e929a 100644 --- a/pkg/flow/tracer_ringbuf.go +++ b/pkg/flow/tracer_ringbuf.go @@ -79,13 +79,13 @@ func (m *RingBufTracer) TraceLoop(ctx context.Context) node.StartFunc[*model.Raw func (m *RingBufTracer) listenAndForwardRingBuffer(debugging bool, forwardCh chan<- *model.RawRecord) error { event, err := m.ringBuffer.ReadRingBuf() if err != nil { - m.metrics.Errors.WithErrorName("ringbuffer", "CannotReadRingbuffer").Inc() + m.metrics.Errors.WithErrorName("ringbuffer", "CannotReadRingbuffer", metrics.HighSeverity).Inc() return fmt.Errorf("reading from ring buffer: %w", err) } // Parses the ringbuf event entry into an Event structure. readFlow, err := model.ReadFrom(bytes.NewBuffer(event.RawSample)) if err != nil { - m.metrics.Errors.WithErrorName("ringbuffer", "CannotParseRingbuffer").Inc() + m.metrics.Errors.WithErrorName("ringbuffer", "CannotParseRingbuffer", metrics.HighSeverity).Inc() return fmt.Errorf("parsing data received from the ring buffer: %w", err) } mapFullError := readFlow.Metrics.Errno == uint8(syscall.E2BIG) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 413a8dd51..0f4db3506 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -123,6 +123,7 @@ var ( TypeCounter, "component", "error", + "severity", ) flowEnrichmentCounterCounter = defineMetric( "flows_enrichment_total", @@ -307,6 +308,14 @@ type ErrorCounter struct { vec *prometheus.CounterVec } -func (c *ErrorCounter) WithErrorName(component, errName string) prometheus.Counter { - return c.vec.WithLabelValues(component, errName) +type ErrorSeverity string + +const ( + HighSeverity ErrorSeverity = "high" + MediumSeverity ErrorSeverity = "medium" + LowSeverity ErrorSeverity = "low" +) + +func (c *ErrorCounter) WithErrorName(component, errName string, severity ErrorSeverity) prometheus.Counter { + return c.vec.WithLabelValues(component, errName, string(severity)) } diff --git a/pkg/model/flow_content.go b/pkg/model/flow_content.go index 155926f37..7c3202012 100644 --- a/pkg/model/flow_content.go +++ b/pkg/model/flow_content.go @@ -116,26 +116,6 @@ func (p *BpfFlowContent) AccumulateAdditional(other *ebpf.BpfAdditionalMetrics) if !AllZeroIP(IP(other.TranslatedFlow.Saddr)) && !AllZeroIP(IP(other.TranslatedFlow.Daddr)) { p.AdditionalMetrics.TranslatedFlow = other.TranslatedFlow } - // Accumulate interfaces + directions - accumulateInterfaces(&p.AdditionalMetrics.NbObservedIntf, &p.AdditionalMetrics.ObservedIntf, other.NbObservedIntf, other.ObservedIntf) -} - -func accumulateInterfaces(dstSize *uint8, dstIntf *[MaxObservedInterfaces]ebpf.BpfObservedIntfT, srcSize uint8, srcIntf [MaxObservedInterfaces]ebpf.BpfObservedIntfT) { - iObs := uint8(0) -outer: - for *dstSize < uint8(len(dstIntf)) && iObs < srcSize { - for u := uint8(0); u < *dstSize; u++ { - if dstIntf[u].Direction == srcIntf[iObs].Direction && - dstIntf[u].IfIndex == srcIntf[iObs].IfIndex { - // Ignore if already exists - iObs++ - continue outer - } - } - dstIntf[*dstSize] = srcIntf[iObs] - *dstSize++ - iObs++ - } } func allZerosMac(s [6]uint8) bool { diff --git a/pkg/model/flow_content_test.go b/pkg/model/flow_content_test.go index 8d0064390..1bc8d0278 100644 --- a/pkg/model/flow_content_test.go +++ b/pkg/model/flow_content_test.go @@ -204,68 +204,6 @@ func TestAccumulate(t *testing.T) { PktDrops: ebpf.BpfPktDropsT{Packets: 5, Bytes: 1000, LatestFlags: 1}, }, }, - }, { - name: "merge interfaces", - inputFlow: ebpf.BpfFlowMetrics{Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1}, - inputAdditional: []ebpf.BpfAdditionalMetrics{ - { - NbObservedIntf: 2, - ObservedIntf: [MaxObservedInterfaces]ebpf.BpfObservedIntfT{ - {Direction: 0, IfIndex: 1}, - {Direction: 1, IfIndex: 2}, - }, - }, - { - NbObservedIntf: 2, - ObservedIntf: [MaxObservedInterfaces]ebpf.BpfObservedIntfT{ - {Direction: 0, IfIndex: 1}, // duplicate - {Direction: 1, IfIndex: 3}, - }, - }, - }, - expected: BpfFlowContent{ - BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1}, - AdditionalMetrics: &ebpf.BpfAdditionalMetrics{ - NbObservedIntf: 3, - ObservedIntf: [MaxObservedInterfaces]ebpf.BpfObservedIntfT{ - {Direction: 0, IfIndex: 1}, - {Direction: 1, IfIndex: 2}, - {Direction: 1, IfIndex: 3}, - }, - }, - }, - }, { - name: "ignore too many interfaces", - inputFlow: ebpf.BpfFlowMetrics{Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1}, - inputAdditional: []ebpf.BpfAdditionalMetrics{ - { - NbObservedIntf: 3, - ObservedIntf: [MaxObservedInterfaces]ebpf.BpfObservedIntfT{ - {Direction: 0, IfIndex: 1}, - {Direction: 1, IfIndex: 2}, - {Direction: 1, IfIndex: 3}, - }, - }, - { - NbObservedIntf: 2, - ObservedIntf: [MaxObservedInterfaces]ebpf.BpfObservedIntfT{ - {Direction: 0, IfIndex: 4}, - {Direction: 1, IfIndex: 5}, - }, - }, - }, - expected: BpfFlowContent{ - BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1}, - AdditionalMetrics: &ebpf.BpfAdditionalMetrics{ - NbObservedIntf: 4, - ObservedIntf: [MaxObservedInterfaces]ebpf.BpfObservedIntfT{ - {Direction: 0, IfIndex: 1}, - {Direction: 1, IfIndex: 2}, - {Direction: 1, IfIndex: 3}, - {Direction: 0, IfIndex: 4}, - }, - }, - }, }} for i, tc := range tcs { t.Run(fmt.Sprint(i), func(t *testing.T) { diff --git a/pkg/model/record.go b/pkg/model/record.go index 80df69e92..35a6c2123 100644 --- a/pkg/model/record.go +++ b/pkg/model/record.go @@ -25,7 +25,7 @@ const ( IPv6Type = 0x86DD NetworkEventsMaxEventsMD = 8 MaxNetworkEvents = 4 - MaxObservedInterfaces = 4 + MaxObservedInterfaces = 6 ) var recordLog = logrus.WithField("component", "model") @@ -97,14 +97,15 @@ func NewRecord( int(metrics.DirectionFirstSeen), s, record.UdnsCache)} + for i := uint8(0); i < record.Metrics.NbObservedIntf; i++ { + record.Interfaces = append(record.Interfaces, NewIntfDirUdn( + interfaceNamer(int(metrics.ObservedIntf[i])), + int(metrics.ObservedDirection[i]), + s, record.UdnsCache, + )) + } + if metrics.AdditionalMetrics != nil { - for i := uint8(0); i < record.Metrics.AdditionalMetrics.NbObservedIntf; i++ { - record.Interfaces = append(record.Interfaces, NewIntfDirUdn( - interfaceNamer(int(metrics.AdditionalMetrics.ObservedIntf[i].IfIndex)), - int(metrics.AdditionalMetrics.ObservedIntf[i].Direction), - s, record.UdnsCache, - )) - } if metrics.AdditionalMetrics.FlowRtt != 0 { record.TimeFlowRtt = time.Duration(metrics.AdditionalMetrics.FlowRtt) } diff --git a/pkg/model/record_test.go b/pkg/model/record_test.go index 20794a15b..3aa7e70cc 100644 --- a/pkg/model/record_test.go +++ b/pkg/model/record_test.go @@ -39,10 +39,20 @@ func TestRecordBinaryEncoding(t *testing.T) { 0x13, 0x14, 0x15, 0x16, // u32 if_index_first_seen 0x00, 0x00, 0x00, 0x00, // u32 lock 0x02, 0x00, 0x00, 0x00, // u32 sampling - 0x03, // u8 direction_first_seen - 0x33, // u8 errno - 0x60, // u8 dscp - 0x00, 0x00, 0x00, 0x00, 0x00, // 5 bytes padding + 0x03, // u8 direction_first_seen + 0x33, // u8 errno + 0x60, // u8 dscp + 0x02, // u8 nb_observed_intf + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, // observed_direction[6] + 0x00, 0x00, // 2 bytes padding + // observed_intf[6] + 0x07, 0x00, 0x00, 0x00, + 0x08, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, // 4 bytes padding })) require.NoError(t, err) @@ -70,6 +80,9 @@ func TestRecordBinaryEncoding(t *testing.T) { Errno: 0x33, Dscp: 0x60, Sampling: 0x02, + NbObservedIntf: 2, + ObservedIntf: [MaxObservedInterfaces]uint32{7, 8}, + ObservedDirection: [MaxObservedInterfaces]uint8{1, 0}, }, }, *fr) // assert that IP addresses are interpreted as IPv4 addresses @@ -121,16 +134,9 @@ func TestAdditionalMetricsBinaryEncoding(t *testing.T) { 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, - 0x00, 0x00, // 2bytes padding - // observed_intf_t[4] - 0x01, 0x00, 0x00, 0x00, 0x07, 0x00, 0x00, 0x00, // [0]: u8 direction + 3 bytes padding + u32 if_index - 0x00, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, // [1]: u8 direction + 3 bytes padding + u32 if_index - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // [2]: u8 direction + 3 bytes padding + u32 if_index - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // [3]: u8 direction + 3 bytes padding + u32 if_index 0x03, 0x00, // u16 eth_protocol - 0x01, // u8 network_events_idx - 0x02, // u8 nb_observed_intf - 0x00, 0x00, 0x00, 0x00, // 4 bytes padding + 0x01, // u8 network_events_idx + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 7 bytes padding } var addmet ebpf.BpfAdditionalMetrics err := binary.Read(bytes.NewReader(b), binary.LittleEndian, &addmet) @@ -167,10 +173,5 @@ func TestAdditionalMetricsBinaryEncoding(t *testing.T) { Dport: 0, ZoneId: 2, }, - NbObservedIntf: 2, - ObservedIntf: [MaxObservedInterfaces]ebpf.BpfObservedIntfT{ - {Direction: 1, IfIndex: 7}, - {Direction: 0, IfIndex: 8}, - }, }, addmet) } diff --git a/pkg/tracer/tracer.go b/pkg/tracer/tracer.go index da2cac8bc..ac8e288c8 100644 --- a/pkg/tracer/tracer.go +++ b/pkg/tracer/tracer.go @@ -887,7 +887,7 @@ func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[ebpf.BpfFlowI return m.legacyLookupAndDeleteMap(met) } log.WithError(err).WithField("flowId", id).Warnf("couldn't lookup/delete flow entry") - met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows").Inc() + met.Errors.WithErrorName("flow-fetcher", "CannotDeleteFlows", metrics.HighSeverity).Inc() continue } flows[id] = model.NewBpfFlowContent(baseMetrics) @@ -911,7 +911,7 @@ func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[ebpf.BpfFlowI return m.legacyLookupAndDeleteMap(met) } log.WithError(err).WithField("flowId", id).Warnf("couldn't lookup/delete additional metrics entry") - met.Errors.WithErrorName("flow-fetcher", "CannotDeleteAdditionalMetric").Inc() + met.Errors.WithErrorName("flow-fetcher", "CannotDeleteAdditionalMetric", metrics.HighSeverity).Inc() continue } flow, found := flows[id] @@ -959,7 +959,7 @@ func (m *FlowFetcher) ReadGlobalCounter(met *metrics.Metrics) { ebpf.BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_GROUPID_MISMATCH: met.NetworkEventsCounter.WithSourceAndReason("network-events", "NetworkEventsErrorsGroupIDMismatch"), ebpf.BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS: met.NetworkEventsCounter.WithSourceAndReason("network-events", "NetworkEventsErrorsFlowMapUpdate"), ebpf.BpfGlobalCountersKeyTNETWORK_EVENTS_GOOD: met.NetworkEventsCounter.WithSourceAndReason("network-events", "NetworkEventsGoodEvent"), - ebpf.BpfGlobalCountersKeyTOBSERVED_INTF_MISSED: met.Errors.WithErrorName("flow-fetcher", "MaxObservedInterfacesReached"), + ebpf.BpfGlobalCountersKeyTOBSERVED_INTF_MISSED: met.Errors.WithErrorName("flow-fetcher", "MaxObservedInterfacesReached", metrics.LowSeverity), } zeroCounters := make([]uint32, cilium.MustPossibleCPU()) for key := ebpf.BpfGlobalCountersKeyT(0); key < ebpf.BpfGlobalCountersKeyTMAX_COUNTERS; key++ { @@ -1723,7 +1723,7 @@ func (p *PacketFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[int][]*byte return p.legacyLookupAndDeleteMap(met) } log.WithError(err).WithField("packetID", id).Warnf("couldn't delete entry") - met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteEntry").Inc() + met.Errors.WithErrorName("pkt-fetcher", "CannotDeleteEntry", metrics.HighSeverity).Inc() } packets[id] = packet } diff --git a/pkg/tracer/tracer_legacy.go b/pkg/tracer/tracer_legacy.go index 51681d7a6..4e5a384a2 100644 --- a/pkg/tracer/tracer_legacy.go +++ b/pkg/tracer/tracer_legacy.go @@ -23,7 +23,7 @@ func (m *FlowFetcher) legacyLookupAndDeleteMap(met *metrics.Metrics) map[ebpf.Bp count++ if err := flowMap.Delete(id); err != nil { log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry") - met.Errors.WithErrorName("flow-fetcher-legacy", "CannotDeleteFlows").Inc() + met.Errors.WithErrorName("flow-fetcher-legacy", "CannotDeleteFlows", metrics.HighSeverity).Inc() } flows[id] = model.NewBpfFlowContent(baseMetrics) } @@ -44,7 +44,7 @@ func (p *PacketFetcher) legacyLookupAndDeleteMap(met *metrics.Metrics) map[int][ for iterator.Next(&id, &packet) { if err := packetMap.Delete(id); err != nil { log.WithError(err).WithField("packetID ", id).Warnf("couldn't delete entry") - met.Errors.WithErrorName("pkt-fetcher-legacy", "CannotDeleteEntry").Inc() + met.Errors.WithErrorName("pkt-fetcher-legacy", "CannotDeleteEntry", metrics.HighSeverity).Inc() } packets[id] = append(packets[id], packet...) }