Skip to content

Commit b044453

Browse files
committed
NETOBSERV-2307: NETOBSERV-2315: fix several IPFIX issues
- Fixed flows without ports that generated errors in logs, and were not exported. It shouldn't matter that ports are missing (e.g. ICMP) - More generally, any missing field won't trigger an error anymore - Some fields were missing: icmp type/code, tcp flags - Fix resending templates in case of collector being restarted
1 parent 19fb27b commit b044453

File tree

3 files changed

+247
-133
lines changed

3 files changed

+247
-133
lines changed

pkg/api/write_ipfix.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,24 @@ package api
22

33
import (
44
"errors"
5+
"time"
56
)
67

78
type WriteIpfix struct {
8-
TargetHost string `yaml:"targetHost,omitempty" json:"targetHost,omitempty" doc:"IPFIX Collector host target IP"`
9-
TargetPort int `yaml:"targetPort,omitempty" json:"targetPort,omitempty" doc:"IPFIX Collector host target port"`
10-
Transport string `yaml:"transport,omitempty" json:"transport,omitempty" doc:"Transport protocol (tcp/udp) to be used for the IPFIX connection"`
11-
EnterpriseID int `yaml:"enterpriseId,omitempty" json:"EnterpriseId,omitempty" doc:"Enterprise ID for exporting transformations"`
9+
TargetHost string `yaml:"targetHost,omitempty" json:"targetHost,omitempty" doc:"IPFIX Collector host target IP"`
10+
TargetPort int `yaml:"targetPort,omitempty" json:"targetPort,omitempty" doc:"IPFIX Collector host target port"`
11+
Transport string `yaml:"transport,omitempty" json:"transport,omitempty" doc:"Transport protocol (tcp/udp) to be used for the IPFIX connection"`
12+
EnterpriseID int `yaml:"enterpriseId,omitempty" json:"EnterpriseId,omitempty" doc:"Enterprise ID for exporting transformations"`
13+
TplSendInterval Duration `yaml:"tplSendInterval,omitempty" json:"tplSendInterval,omitempty" doc:"Interval for resending templates to the collector (default: 1m)"`
1214
}
1315

1416
func (w *WriteIpfix) SetDefaults() {
1517
if w.Transport == "" {
1618
w.Transport = "tcp"
1719
}
20+
if w.TplSendInterval.Duration == 0 {
21+
w.TplSendInterval.Duration = time.Minute
22+
}
1823
}
1924

2025
func (w *WriteIpfix) Validate() error {

pkg/pipeline/write/testnorace/write_ipfix_test.go

Lines changed: 101 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
var (
2626
startTime = time.Now()
2727
endTime = startTime.Add(7 * time.Second)
28-
FullPBFlow = pbflow.Record{
28+
fullPBFlow = pbflow.Record{
2929
Direction: pbflow.Direction_EGRESS,
3030
Bytes: 1024,
3131
DataLink: &pbflow.DataLink{
@@ -79,13 +79,54 @@ var (
7979
},
8080
},
8181
}
82+
83+
icmpPBFlow = pbflow.Record{
84+
Direction: pbflow.Direction_INGRESS,
85+
Bytes: 1024,
86+
DataLink: &pbflow.DataLink{
87+
DstMac: 0x112233445566,
88+
SrcMac: 0x010203040506,
89+
},
90+
Network: &pbflow.Network{
91+
SrcAddr: &pbflow.IP{
92+
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304},
93+
},
94+
DstAddr: &pbflow.IP{
95+
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
96+
},
97+
},
98+
EthProtocol: 2048,
99+
Packets: 3,
100+
Transport: &pbflow.Transport{
101+
Protocol: 1,
102+
},
103+
TimeFlowStart: timestamppb.New(startTime),
104+
TimeFlowEnd: timestamppb.New(endTime),
105+
106+
AgentIp: &pbflow.IP{
107+
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x0a090807},
108+
},
109+
Flags: 0x110,
110+
IcmpCode: 10,
111+
IcmpType: 8,
112+
DupList: []*pbflow.DupMapEntry{
113+
{
114+
Interface: "eth0",
115+
Direction: pbflow.Direction_EGRESS,
116+
},
117+
{
118+
Interface: "a1234567",
119+
Direction: pbflow.Direction_INGRESS,
120+
},
121+
},
122+
}
82123
)
83124

84125
func TestEnrichedIPFIXFlow(t *testing.T) {
85126
cp := startCollector(t)
86127
addr := cp.GetAddress().(*net.UDPAddr)
87128

88-
flow := decode.PBFlowToMap(&FullPBFlow)
129+
flow := decode.PBFlowToMap(&fullPBFlow)
89130

90131
// Add enrichment
91132
flow["SrcK8S_Name"] = "pod A"
@@ -146,7 +187,7 @@ func TestEnrichedIPFIXPartialFlow(t *testing.T) {
146187
cp := startCollector(t)
147188
addr := cp.GetAddress().(*net.UDPAddr)
148189

149-
flow := decode.PBFlowToMap(&FullPBFlow)
190+
flow := decode.PBFlowToMap(&fullPBFlow)
150191

151192
// Add partial enrichment
152193
flow["SrcK8S_Name"] = "pod A"
@@ -207,7 +248,7 @@ func TestBasicIPFIXFlow(t *testing.T) {
207248
cp := startCollector(t)
208249
addr := cp.GetAddress().(*net.UDPAddr)
209250

210-
flow := decode.PBFlowToMap(&FullPBFlow)
251+
flow := decode.PBFlowToMap(&fullPBFlow)
211252

212253
// Add partial enrichment (must be ignored)
213254
flow["SrcK8S_Name"] = "pod A"
@@ -264,6 +305,62 @@ func TestBasicIPFIXFlow(t *testing.T) {
264305
}
265306
}
266307

308+
func TestICMPIPFIXFlow(t *testing.T) {
309+
cp := startCollector(t)
310+
addr := cp.GetAddress().(*net.UDPAddr)
311+
312+
flow := decode.PBFlowToMap(&icmpPBFlow)
313+
314+
writer, err := write.NewWriteIpfix(config.StageParam{
315+
Write: &config.Write{
316+
Ipfix: &api.WriteIpfix{
317+
TargetHost: addr.IP.String(),
318+
TargetPort: addr.Port,
319+
Transport: addr.Network(),
320+
// No enterprise ID here
321+
},
322+
},
323+
})
324+
require.NoError(t, err)
325+
326+
writer.Write(flow)
327+
328+
// Read collector
329+
// 1st = IPv4 template
330+
tplv4Msg := <-cp.GetMsgChan()
331+
// 2nd = IPv6 template (ignore)
332+
<-cp.GetMsgChan()
333+
// 3rd = data record
334+
dataMsg := <-cp.GetMsgChan()
335+
cp.Stop()
336+
337+
// Check template
338+
assert.Equal(t, uint16(10), tplv4Msg.GetVersion())
339+
templateSet := tplv4Msg.GetSet()
340+
templateElements := templateSet.GetRecords()[0].GetOrderedElementList()
341+
assert.Len(t, templateElements, len(write.IPv4IANAFields))
342+
assert.Equal(t, uint32(0), templateElements[0].GetInfoElement().EnterpriseId)
343+
344+
// Check data
345+
assert.Equal(t, uint16(10), dataMsg.GetVersion())
346+
dataSet := dataMsg.GetSet()
347+
record := dataSet.GetRecords()[0]
348+
349+
for _, name := range write.IPv4IANAFields {
350+
element, _, exist := record.GetInfoElementWithValue(name)
351+
assert.Truef(t, exist, "element with name %s should exist in the record", name)
352+
assert.NotNil(t, element)
353+
matchElement(t, element, flow)
354+
}
355+
356+
// Make sure enriched fields are absent
357+
for _, name := range write.KubeFields {
358+
element, _, exist := record.GetInfoElementWithValue(name)
359+
assert.Falsef(t, exist, "element with name %s should NOT exist in the record", name)
360+
assert.Nil(t, element)
361+
}
362+
}
363+
267364
//nolint:cyclop
268365
func matchElement(t *testing.T, element entities.InfoElementWithValue, flow config.GenericMap) {
269366
name := element.GetName()

0 commit comments

Comments
 (0)